[jira] [Comment Edited] (FLINK-6997) SavepointITCase fails in master branch sometimes
[ https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061814#comment-16061814 ] mingleizhang edited comment on FLINK-6997 at 6/24/17 5:51 AM: -- I tested several times and occasionally failed. But what i got was a different error message following. Computer belongs to Windows7 {code:java} java.io.IOException: Dispose savepoint failed at org.apache.flink.runtime.testingUtils.TestingCluster.disposeSavepoint(TestingCluster.scala:372) at org.apache.flink.runtime.testingUtils.TestingCluster.disposeSavepoint(TestingCluster.scala:328) at org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:840) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) {code} was (Author: mingleizhang): I tested several times and occasionally failed. But what i got was a different error message following {code:java} java.io.IOException: Dispose savepoint failed at org.apache.flink.runtime.testingUtils.TestingCluster.disposeSavepoint(TestingCluster.scala:372) at org.apache.flink.runtime.testingUtils.TestingCluster.disposeSavepoint(TestingCluster.scala:328) at org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:840) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at
[jira] [Commented] (FLINK-6997) SavepointITCase fails in master branch sometimes
[ https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061814#comment-16061814 ] mingleizhang commented on FLINK-6997: - I tested several times and occasionally failed. But what i got was a different error message following {code:java} java.io.IOException: Dispose savepoint failed at org.apache.flink.runtime.testingUtils.TestingCluster.disposeSavepoint(TestingCluster.scala:372) at org.apache.flink.runtime.testingUtils.TestingCluster.disposeSavepoint(TestingCluster.scala:328) at org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:840) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) {code} > SavepointITCase fails in master branch sometimes > > > Key: FLINK-6997 > URL: https://issues.apache.org/jira/browse/FLINK-6997 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Ted Yu > > I got the following test failure (with commit > a0b781461bcf8c2f1d00b93464995f03eda589f1) > {code} > testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase) > Time elapsed: 8.129 sec <<< ERROR! > java.io.IOException: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342) > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316) > at > org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827) > Caused by: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805) > at > org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) > at akka.dispatch.OnComplete.internal(Future.scala:247) > at akka.dispatch.OnComplete.internal(Future.scala:245) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at >
[jira] [Updated] (FLINK-6997) SavepointITCase fails in master branch sometimes
[ https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-6997: -- Summary: SavepointITCase fails in master branch sometimes (was: SavepointITCase fails in master branch) > SavepointITCase fails in master branch sometimes > > > Key: FLINK-6997 > URL: https://issues.apache.org/jira/browse/FLINK-6997 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Ted Yu > > I got the following test failure (with commit > a0b781461bcf8c2f1d00b93464995f03eda589f1) > {code} > testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase) > Time elapsed: 8.129 sec <<< ERROR! > java.io.IOException: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342) > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316) > at > org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827) > Caused by: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805) > at > org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) > at akka.dispatch.OnComplete.internal(Future.scala:247) > at akka.dispatch.OnComplete.internal(Future.scala:245) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required > tasks are currently running. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5695) Optimize table type systems based on database semantics
[ https://issues.apache.org/jira/browse/FLINK-5695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061625#comment-16061625 ] ASF GitHub Bot commented on FLINK-5695: --- Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/3248 > Optimize table type systems based on database semantics > --- > > Key: FLINK-5695 > URL: https://issues.apache.org/jira/browse/FLINK-5695 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Optimize table type systems based on database semantics.As follows: > {code} > groupBy > > > Table GroupedTable > ∧ < ∧ >| select| >| | >| where| >| select| groupBy >| agg | >| ... | >| window | > ∨ -> > TableWindowedTable > <- > select > {code} > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3248: [FLINK-5695] [Table API & SQL] Optimize table type...
Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/3248 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6916) FLIP-19: Improved BLOB storage architecture
[ https://issues.apache.org/jira/browse/FLINK-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061567#comment-16061567 ] ASF GitHub Bot commented on FLINK-6916: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4176 [FLINK-6916][blob] add API to allow job-related BLOBs to be stored This PR is the fourth in a series for FLIP-19, based upon #4174. It allows (CONTENT_ADDRESSABLE) BLOBs to not only be stored job-independently but also based on a jobId which will become the future default for JARs, `TaskDeploymentDescriptor` data, and `DistributedCache` data. Please note that we will rework/reorganise the whole BLOB store APIs completely in a future request. This PR focusses on the added functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6916-3-jobid Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4176.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4176 commit ce719ee39fbbca7b7828c17d9792fc87d37450c7 Author: Nico KruberDate: 2017-01-06T17:42:58Z [FLINK-6008][docs] update some config options to the new, non-deprecated ones commit 9efa8808e46adc1253f52a6a8cec6d3b4d29fee3 Author: Nico Kruber Date: 2016-12-20T15:49:57Z [FLINK-6008][docs] minor improvements in the BlobService docs commit ca3d533b0affa645ec93d40de378dadc829bbfe5 Author: Nico Kruber Date: 2016-12-20T17:27:13Z [FLINK-6008] refactor BlobCache#getURL() for cleaner code commit 0eededeb36dd833835753def7f4bb27c9d5fb67e Author: Nico Kruber Date: 2017-03-09T17:14:02Z [FLINK-6008] use Preconditions.checkArgument in BlobClient commit 6249041a9db2b39ddf54e79a1aed5e7706e739c7 Author: Nico Kruber Date: 2016-12-21T15:23:29Z [FLINK-6008] do not fail the BlobServer if delete fails also extend the delete tests and remove one code duplication commit e681239a538547f752d65358db1ebd2ba312b33c Author: Nico Kruber Date: 2017-03-17T15:21:40Z [FLINK-6008] fix concurrent job directory creation also add according unit tests commit 20beae2dbc91859e2ec724b35b20536dcd11fe90 Author: Nico Kruber Date: 2017-04-18T14:37:37Z [FLINK-6008] some comments about BlobLibraryCacheManager cleanup commit 8a33517fe6eb2fa932ab17cb0d82a3fa8d7b8d0b Author: Nico Kruber Date: 2017-04-19T13:39:03Z [hotfix] minor typos commit 23889866ac21494fc4af90905ab1518cbe897118 Author: Nico Kruber Date: 2017-04-19T14:10:16Z [FLINK-6008] further cleanup tests for BlobLibraryCacheManager commit 01b1a245528c264a6061ed3a48b24c5a207369f6 Author: Nico Kruber Date: 2017-06-14T16:01:47Z [FLINK-6008] do not guard a delete() call with a check for existence commit cb249759b79d88eda37a8bb149040be3052059ac Author: Nico Kruber Date: 2017-06-16T08:51:04Z [FLINK-6916][blob] remove (unused) NAME_ADDRESSABLE mode commit 7dc9cdb5bcf19a5e6b3190ce564201f1cc24 Author: Nico Kruber Date: 2017-06-21T15:05:57Z [FLINK-6916][blob] remove further unused code due to the NAME_ADDRESSABLE removal commit 00242371fed84a658ce88765204c450cc7819cf3 Author: Nico Kruber Date: 2017-06-22T15:31:17Z [FLINK-6916] remove code duplication in BlobClientSslTest This lets BlobClientSslTest extend BlobClientTest as most of its implementation came from there and was simply copied. commit 2a251e5cef0b757333d0dc1ff37f78fa0ea6eba7 Author: Nico Kruber Date: 2017-06-21T12:45:31Z [FLINK-6916] remove LibraryCacheManager#getFile() This was only used in tests where it is avoidable but if used anywhere else, it may have caused cleanup issues. commit 12d76e96247fc34a498260d323c67cbd07f3f905 Author: Nico Kruber Date: 2017-06-21T14:14:15Z [FLINK-6916][blob] refactor getURL() to the more generic getFile() The fact that we always returned URL objects is a relic of the BlobServer's only use for URLClassLoader. Since we'd like to extend its use, returning File objects instead is more generic. commit 3126c3c5e0c146e16d415fe75ccbfaf1770a3bea Author: Nico Kruber Date: 2017-06-23T09:40:34Z [FLINK-6916][blob] verify some of the buffers returned by GET commit 4992ad7bb093444356d4258f2abe0731e1c18825 Author: Nico Kruber Date: 2017-06-23T10:04:10Z [FLINK-6916][blob] use
[GitHub] flink pull request #4176: [FLINK-6916][blob] add API to allow job-related BL...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4176 [FLINK-6916][blob] add API to allow job-related BLOBs to be stored This PR is the fourth in a series for FLIP-19, based upon #4174. It allows (CONTENT_ADDRESSABLE) BLOBs to not only be stored job-independently but also based on a jobId which will become the future default for JARs, `TaskDeploymentDescriptor` data, and `DistributedCache` data. Please note that we will rework/reorganise the whole BLOB store APIs completely in a future request. This PR focusses on the added functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6916-3-jobid Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4176.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4176 commit ce719ee39fbbca7b7828c17d9792fc87d37450c7 Author: Nico KruberDate: 2017-01-06T17:42:58Z [FLINK-6008][docs] update some config options to the new, non-deprecated ones commit 9efa8808e46adc1253f52a6a8cec6d3b4d29fee3 Author: Nico Kruber Date: 2016-12-20T15:49:57Z [FLINK-6008][docs] minor improvements in the BlobService docs commit ca3d533b0affa645ec93d40de378dadc829bbfe5 Author: Nico Kruber Date: 2016-12-20T17:27:13Z [FLINK-6008] refactor BlobCache#getURL() for cleaner code commit 0eededeb36dd833835753def7f4bb27c9d5fb67e Author: Nico Kruber Date: 2017-03-09T17:14:02Z [FLINK-6008] use Preconditions.checkArgument in BlobClient commit 6249041a9db2b39ddf54e79a1aed5e7706e739c7 Author: Nico Kruber Date: 2016-12-21T15:23:29Z [FLINK-6008] do not fail the BlobServer if delete fails also extend the delete tests and remove one code duplication commit e681239a538547f752d65358db1ebd2ba312b33c Author: Nico Kruber Date: 2017-03-17T15:21:40Z [FLINK-6008] fix concurrent job directory creation also add according unit tests commit 20beae2dbc91859e2ec724b35b20536dcd11fe90 Author: Nico Kruber Date: 2017-04-18T14:37:37Z [FLINK-6008] some comments about BlobLibraryCacheManager cleanup commit 8a33517fe6eb2fa932ab17cb0d82a3fa8d7b8d0b Author: Nico Kruber Date: 2017-04-19T13:39:03Z [hotfix] minor typos commit 23889866ac21494fc4af90905ab1518cbe897118 Author: Nico Kruber Date: 2017-04-19T14:10:16Z [FLINK-6008] further cleanup tests for BlobLibraryCacheManager commit 01b1a245528c264a6061ed3a48b24c5a207369f6 Author: Nico Kruber Date: 2017-06-14T16:01:47Z [FLINK-6008] do not guard a delete() call with a check for existence commit cb249759b79d88eda37a8bb149040be3052059ac Author: Nico Kruber Date: 2017-06-16T08:51:04Z [FLINK-6916][blob] remove (unused) NAME_ADDRESSABLE mode commit 7dc9cdb5bcf19a5e6b3190ce564201f1cc24 Author: Nico Kruber Date: 2017-06-21T15:05:57Z [FLINK-6916][blob] remove further unused code due to the NAME_ADDRESSABLE removal commit 00242371fed84a658ce88765204c450cc7819cf3 Author: Nico Kruber Date: 2017-06-22T15:31:17Z [FLINK-6916] remove code duplication in BlobClientSslTest This lets BlobClientSslTest extend BlobClientTest as most of its implementation came from there and was simply copied. commit 2a251e5cef0b757333d0dc1ff37f78fa0ea6eba7 Author: Nico Kruber Date: 2017-06-21T12:45:31Z [FLINK-6916] remove LibraryCacheManager#getFile() This was only used in tests where it is avoidable but if used anywhere else, it may have caused cleanup issues. commit 12d76e96247fc34a498260d323c67cbd07f3f905 Author: Nico Kruber Date: 2017-06-21T14:14:15Z [FLINK-6916][blob] refactor getURL() to the more generic getFile() The fact that we always returned URL objects is a relic of the BlobServer's only use for URLClassLoader. Since we'd like to extend its use, returning File objects instead is more generic. commit 3126c3c5e0c146e16d415fe75ccbfaf1770a3bea Author: Nico Kruber Date: 2017-06-23T09:40:34Z [FLINK-6916][blob] verify some of the buffers returned by GET commit 4992ad7bb093444356d4258f2abe0731e1c18825 Author: Nico Kruber Date: 2017-06-23T10:04:10Z [FLINK-6916][blob] use TemporaryFolder for local BLOB dir in unit tests This replaces the use of some temporary directory where it is not guaranteed that it will be deleted after the test. commit d87d742b9a19efc1a26d2dda0c724d286ba9904d Author: Nico
[jira] [Commented] (FLINK-6958) Async I/O timeout not work
[ https://issues.apache.org/jira/browse/FLINK-6958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061517#comment-16061517 ] Till Rohrmann commented on FLINK-6958: -- {{timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)}} has been intentionally set to {{false}}, because we don't want to execute any timer tasks after the shutdown of the {{StreamTask}}. The reason is the lifecycle management of the {{StreamTask}}. Once the {{StreamTask}} has terminated, all of its components should be closed as well and there shouldn't be any async operations which might try to output elements into an {{OutputEmitter}}. The solution might be as simple as moving the {{stopResources}} call before the {{super.close}} call in {{AsyncWaitOperator.close}}. > Async I/O timeout not work > -- > > Key: FLINK-6958 > URL: https://issues.apache.org/jira/browse/FLINK-6958 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.1 >Reporter: feng xiaojie >Assignee: Jark Wu > > when use Async I/O with UnorderedStreamElementQueue, the queue will always > full if you don't call the AsyncCollector.collect to ack them. > Timeout shall collect these entries when the timeout trigger,but it isn't work > I debug find, > when time out, it will call resultFuture.completeExceptionally(error); > but not call UnorderedStreamElementQueue.onCompleteHandler > it will cause that async i/o hang always -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback
[ https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenzhong Xu updated FLINK-6998: Description: Propose to add "successful-commits" and "failed-commits" metrics in KafkaConsumerThread class. > Kafka connector needs to expose metrics for failed/successful offset commits > in the Kafka Consumer callback > --- > > Key: FLINK-6998 > URL: https://issues.apache.org/jira/browse/FLINK-6998 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > > Propose to add "successful-commits" and "failed-commits" metrics in > KafkaConsumerThread class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback
Zhenzhong Xu created FLINK-6998: --- Summary: Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback Key: FLINK-6998 URL: https://issues.apache.org/jira/browse/FLINK-6998 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Zhenzhong Xu -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6379) Implement FLIP-6 Mesos Resource Manager
[ https://issues.apache.org/jira/browse/FLINK-6379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061510#comment-16061510 ] ASF GitHub Bot commented on FLINK-6379: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3942 Sorry for the long delay Eron. I've started reviewing the code last week. I'm not completely done yet, but so far things look good. As a follow up we could think about how we can remove the dependency on Akka by also converting the `ConnectionMonitor`, `TaskMonitor`, `LaunchCoordinator` and the `ReconciliationCoordinator` components. I hope to finish the review next week and finally merge the PR. > Implement FLIP-6 Mesos Resource Manager > --- > > Key: FLINK-6379 > URL: https://issues.apache.org/jira/browse/FLINK-6379 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > Given the new ResourceManager of FLIP-6, implement a new > MesosResourceManager. > The minimal effort would be to implement a new resource manager while > continuing to use the various local actors (launch coordinator, task monitor, > etc.) which implement the various FSMs associated with Mesos scheduling. > The Fenzo library would continue to solve the packing problem of matching > resource offers to slot requests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback
[ https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenzhong Xu reassigned FLINK-6998: --- Assignee: Zhenzhong Xu > Kafka connector needs to expose metrics for failed/successful offset commits > in the Kafka Consumer callback > --- > > Key: FLINK-6998 > URL: https://issues.apache.org/jira/browse/FLINK-6998 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3942: FLINK-6379 Mesos ResourceManager (FLIP-6)
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3942 Sorry for the long delay Eron. I've started reviewing the code last week. I'm not completely done yet, but so far things look good. As a follow up we could think about how we can remove the dependency on Akka by also converting the `ConnectionMonitor`, `TaskMonitor`, `LaunchCoordinator` and the `ReconciliationCoordinator` components. I hope to finish the review next week and finally merge the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6997) SavepointITCase fails in master branch
Ted Yu created FLINK-6997: - Summary: SavepointITCase fails in master branch Key: FLINK-6997 URL: https://issues.apache.org/jira/browse/FLINK-6997 Project: Flink Issue Type: Bug Components: Tests Reporter: Ted Yu I got the following test failure (with commit a0b781461bcf8c2f1d00b93464995f03eda589f1) {code} testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase) Time elapsed: 8.129 sec <<< ERROR! java.io.IOException: java.lang.Exception: Failed to complete savepoint at org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342) at org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316) at org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827) Caused by: java.lang.Exception: Failed to complete savepoint at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805) at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) at akka.dispatch.OnComplete.internal(Future.scala:247) at akka.dispatch.OnComplete.internal(Future.scala:245) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required tasks are currently running. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4167: Test PR, please ignore
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4167 I'm working on a PR to print some system information (with output folding) to the TravisCI log which may be useful in debugging these sort of changes. It would be nice if we could fold each Maven module as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6855) Add handler for taking a savepoint
[ https://issues.apache.org/jira/browse/FLINK-6855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061320#comment-16061320 ] Eron Wright commented on FLINK-6855: - Is this related to FLINK-4336? > Add handler for taking a savepoint > -- > > Key: FLINK-6855 > URL: https://issues.apache.org/jira/browse/FLINK-6855 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Webfrontend >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4167: Test PR, please ignore
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4167 This PR stabilized build times around 55 minutes. While we are not below 50 yet we can still tweak some stuff, so I propose merging it (after re-enabling RAT of course) and go from there. This should at least reduce the PR/master build queues. https://travis-ci.org/apache/flink/builds/245845600 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061291#comment-16061291 ] Stefan Richter commented on FLINK-6964: --- Those placeholder messages look ok and expected, that happens when the placeholder is exchanged with the original state handle. The exception on unregistration looks promising. Will dig into this on monday. Thanks again for the logs! > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061270#comment-16061270 ] Cliff Resnick commented on FLINK-6964: -- looks likes it's still trying to register a Placeholder? > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061257#comment-16061257 ] Cliff Resnick commented on FLINK-6964: -- I ran with your newer precondition. It actually succeeded once, but failed the next two runs, hung on org.apache.flink.runtime.state.SharedStateRegistry - Attempt to register for key WindowOperator... I tried with just a a single slot, but that also hung. The log above represents the hang condition. All the above logged here https://gist.github.com/cresny/0e109f843730b64d3a330f8fb06bb8a6 The good news is there was an exception around state registry > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061216#comment-16061216 ] Cliff Resnick commented on FLINK-6964: -- ok, will try that. meanwhile here is a run (and hang) from last push. https://gist.github.com/cresny/8d0d24b1bd72031a515bd9a3822da189 > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6971) Add Alluxio Filesystem in Flink Ecosystem page
[ https://issues.apache.org/jira/browse/FLINK-6971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061208#comment-16061208 ] Bin Fan commented on FLINK-6971: Thanks [~fhueske], I will read the instructions and get back to here if I get any questions. - Bin > Add Alluxio Filesystem in Flink Ecosystem page > -- > > Key: FLINK-6971 > URL: https://issues.apache.org/jira/browse/FLINK-6971 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Bin Fan >Priority: Minor > > Flink Ecosystem page (http://flink.apache.org/ecosystem.html) lists a set of > third-party projects that support working with Flink. > Alluxio (www.alluxio.org) can work with Flink as a Hadoop-compatible file > system, see more description in > http://www.alluxio.org/docs/master/en/Running-Flink-on-Alluxio.html. I am > wondering if I could submit patch to add a paragraph of Alluxio under > http://flink.apache.org/ecosystem.html#third-party-projects and points users > the Alluxio-flink integration page? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6971) Add Alluxio Filesystem in Flink Ecosystem page
[ https://issues.apache.org/jira/browse/FLINK-6971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061204#comment-16061204 ] Fabian Hueske commented on FLINK-6971: -- Hi [~apc999], yes a PR would be very much appreciated. You can find the general contribution guidelines here: http://flink.apache.org/how-to-contribute.html and the ones for the website here: http://flink.apache.org/improve-website.html Thank you, Fabian > Add Alluxio Filesystem in Flink Ecosystem page > -- > > Key: FLINK-6971 > URL: https://issues.apache.org/jira/browse/FLINK-6971 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Bin Fan >Priority: Minor > > Flink Ecosystem page (http://flink.apache.org/ecosystem.html) lists a set of > third-party projects that support working with Flink. > Alluxio (www.alluxio.org) can work with Flink as a Hadoop-compatible file > system, see more description in > http://www.alluxio.org/docs/master/en/Running-Flink-on-Alluxio.html. I am > wondering if I could submit patch to add a paragraph of Alluxio under > http://flink.apache.org/ecosystem.html#third-party-projects and points users > the Alluxio-flink integration page? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6971) Add Alluxio Filesystem in Flink Ecosystem page
[ https://issues.apache.org/jira/browse/FLINK-6971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061191#comment-16061191 ] Bin Fan commented on FLINK-6971: Anyone to comment? Is this the recommended way? thanks > Add Alluxio Filesystem in Flink Ecosystem page > -- > > Key: FLINK-6971 > URL: https://issues.apache.org/jira/browse/FLINK-6971 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Bin Fan >Priority: Minor > > Flink Ecosystem page (http://flink.apache.org/ecosystem.html) lists a set of > third-party projects that support working with Flink. > Alluxio (www.alluxio.org) can work with Flink as a Hadoop-compatible file > system, see more description in > http://www.alluxio.org/docs/master/en/Running-Flink-on-Alluxio.html. I am > wondering if I could submit patch to add a paragraph of Alluxio under > http://flink.apache.org/ecosystem.html#third-party-projects and points users > the Alluxio-flink integration page? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061190#comment-16061190 ] Stefan Richter commented on FLINK-6964: --- Alright, it wasn't the precondition. I have pushed the version that runs as expected in my local testing. Log statements are included. > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061178#comment-16061178 ] Cliff Resnick commented on FLINK-6964: -- ok I'll wait on your push > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061174#comment-16061174 ] Stefan Richter commented on FLINK-6964: --- I think I already found the problem. A stupid mistake: the precondition that I introduced in the fix was inverted :-( > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061168#comment-16061168 ] Piotr Nowojski edited comment on FLINK-6996 at 6/23/17 4:20 PM: Yes, thanks, you are correct. Fixed description. was (Author: pnowojski): Yes, you are correct. Fixed description. > FlinkKafkaProducer010 doesn't guarantee at-least-once semantic > -- > > Key: FLINK-6996 > URL: https://issues.apache.org/jira/browse/FLINK-6996 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This > means, when it's used like a "regular sink function" (option a from [the java > doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) > it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-6996: -- Description: FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This means, when it's used like a "regular sink function" (option a from [the java doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) it will not flush the data on "snapshotState" as it is supposed to. (was: FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This means, when it's used like a "custom operator" (option b from [the java doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) it will not flush the data on "snapshotState" as it is supposed to.) > FlinkKafkaProducer010 doesn't guarantee at-least-once semantic > -- > > Key: FLINK-6996 > URL: https://issues.apache.org/jira/browse/FLINK-6996 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This > means, when it's used like a "regular sink function" (option a from [the java > doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) > it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061168#comment-16061168 ] Piotr Nowojski commented on FLINK-6996: --- Yes, you are correct. Fixed description. > FlinkKafkaProducer010 doesn't guarantee at-least-once semantic > -- > > Key: FLINK-6996 > URL: https://issues.apache.org/jira/browse/FLINK-6996 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This > means, when it's used like a "custom operator" (option b from [the java > doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) > it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061166#comment-16061166 ] Stefan Richter commented on FLINK-6964: --- In particular, JobManager logs are valuable. > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061164#comment-16061164 ] Stefan Richter commented on FLINK-6964: --- I suggest to log Flink runtime at INFO level, thanks! :-) > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061163#comment-16061163 ] Cliff Resnick commented on FLINK-6964: -- Ha! I just started running. ok, will merge and rebuild The logging scopes above are for a separate network appender, so I tend to keep it narrow. Should I broaden it to all of flink.runtime? > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061143#comment-16061143 ] Stefan Richter commented on FLINK-6964: --- I have added a commit to my branch that introduces more logging. The log scope looks good, do you log all remaining packages on INFO? > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6885) RMQSource does not support qos, leading to oom
[ https://issues.apache.org/jira/browse/FLINK-6885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061130#comment-16061130 ] Fabian Hueske commented on FLINK-6885: -- [~rubenvanvreeland] Definitely! > RMQSource does not support qos, leading to oom > -- > > Key: FLINK-6885 > URL: https://issues.apache.org/jira/browse/FLINK-6885 > Project: Flink > Issue Type: Bug > Components: RabbitMQ Connector, Streaming Connectors >Reporter: Ruben van Vreeland >Priority: Minor > Labels: easyfix > Original Estimate: 2h > Remaining Estimate: 2h > > RabbitMQ connector supports qos, limiting the prefetch to a configurable > amout of objects. With the current builder patter that is used by the > RMQSource howver, this cannot be configured, leading to an OOM when enough > objects are prefetched. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6885) RMQSource does not support qos, leading to oom
[ https://issues.apache.org/jira/browse/FLINK-6885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061122#comment-16061122 ] Ruben van Vreeland commented on FLINK-6885: --- Can I propose a change in a PR? > RMQSource does not support qos, leading to oom > -- > > Key: FLINK-6885 > URL: https://issues.apache.org/jira/browse/FLINK-6885 > Project: Flink > Issue Type: Bug > Components: RabbitMQ Connector, Streaming Connectors >Reporter: Ruben van Vreeland >Priority: Minor > Labels: easyfix > Original Estimate: 2h > Remaining Estimate: 2h > > RabbitMQ connector supports qos, limiting the prefetch to a configurable > amout of objects. With the current builder patter that is used by the > RMQSource howver, this cannot be configured, leading to an OOM when enough > objects are prefetched. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4161: Hot fix a description of over Table API document.
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4161#discussion_r123776397 --- Diff: docs/dev/table/tableApi.md --- @@ -1338,7 +1338,10 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov At the moment, over windows with rows following the current row are not supported. All over windows must stop at the current row and only two values are supported for following, CURRENT_RANGE for a time interval and CURRENT_ROW for a row-count interval. -If the `following` clause is omitted, the window will end at the current row. + CURRENT_ROW means the window will end at the current row. + CURRENT_RANGE means the window will end at the current rowâs last peer in the ORDER BY ordering. --- End diff -- `CURRENT_ROW` sets the upper bound of the window to the current row. `CURRENT_RANGE` sets the upper bound of the window to sort key of the the current row, i.e., all rows with the same sort key as the current row are included in the window. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061092#comment-16061092 ] Cliff Resnick commented on FLINK-6964: -- I'll merge and rerun. This is what I have for log scope. Should I add anything? {noformat} org.apache.flink.contrib.streaming.state=TRACE org.apache.flink.runtime.checkpoint=TRACE org.apache.flink.runtime.state=TRACE {noformat} > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061091#comment-16061091 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6996 at 6/23/17 3:12 PM: - I think its the other way around. In approach (b), i.e. {{FlinkKafkaProducer010.writeToKafkaWithTimestamps(inStream, schema, config)}}, flushing works. It's in approach (a) where its used as a regular sink UDF {{stream.addSink(new FlinkKafkaProducer010(...))}}, since it doesn't implement the {{CheckpointedFunction}} interface, there's no flushing happening. was (Author: tzulitai): I think its the other way around. In approach (b), i.e. {{FlinkKafkaProducer010.writeToKafkaWithTimestamps(inStream, schema, config)}}, flushing works. It's in approach (a) where its used as a regular sink function {{stream.addSink(new FlinkKafkaProducer010(...))}}, since it doesn't implement the {{CheckpointedFunction}} interface, there's no flushing happening. > FlinkKafkaProducer010 doesn't guarantee at-least-once semantic > -- > > Key: FLINK-6996 > URL: https://issues.apache.org/jira/browse/FLINK-6996 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This > means, when it's used like a "custom operator" (option b from [the java > doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) > it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061091#comment-16061091 ] Tzu-Li (Gordon) Tai commented on FLINK-6996: I think its the other way around. In approach (b), i.e. {{FlinkKafkaProducer010.writeToKafkaWithTimestamps(inStream, schema, config)}}, flushing works. It's in approach (a) where its used as a regular sink function {{stream.addSink(new FlinkKafkaProducer010(...))}}, since it doesn't implement the {{CheckpointedFunction}} interface, there's no flushing happening. > FlinkKafkaProducer010 doesn't guarantee at-least-once semantic > -- > > Key: FLINK-6996 > URL: https://issues.apache.org/jira/browse/FLINK-6996 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This > means, when it's used like a "custom operator" (option b from [the java > doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) > it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061088#comment-16061088 ] Cliff Resnick commented on FLINK-6964: -- [~srichter] By hanging I mean that the checkpoint, though fully acknowledged, never completes. Looking at the UI I see 100% and a spinning arrow until the checkpoint time expires, apparently without an exception being thrown. I did not merge my added logs into your branch because, from what you described, the issue was with the {code:java}StandaloneCompletedCheckpointStore{code}, which I never added logging to anyway. However if the code as-is in your branch has sufficient logging I can reproduce the issue and create a gist. > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061085#comment-16061085 ] ASF GitHub Bot commented on FLINK-6097: --- Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/3560 > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > When we try to implement `OVER window` TableAPI, The first version of the > prototype to achieve,we do not consider the table field will be out of order > when we implement `translateToPlan` method,then we set `outputRow` field > from `inputRow` according to the Initial order of the table field index. > At the beginning, the projections in the select statement less than 5 columns > It works well.But Unfortunately when the count of projections bigger than 4 > (>=5), we got the random result. Then we debug the code, we find that > `ProjectionTranslator # identifyFieldReferences` method uses the` Set` > temporary save field, when the number of elements in the Set is less than 5, > the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of > elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet > and which will cause the data to be out of order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > // we want (a, b, c, d, e) > Set (e, a, b, c, d) > Class scala.collection.immutable.HashSet $ HashTrieSet > {code} > So we thought 2 approach to solve this problem: > 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the > order of the extracted field references same as input order. > 2. We add the input and output field mapping. > At last we using approach#2 solve the problem. This change is not necessary > for the problem i have faced. But I feel it is better to let the output of > this method in the same order as the input, it may be very helpful for other > cases, though I am currently not aware of any. I am ok with not making this > change, but we should add a comment instead to highlight that the potential > output of the current output. Otherwise, some people may not pay attention to > this and assume it is in order. > Hi, guys, What do you think? Welcome any feedback. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
Piotr Nowojski created FLINK-6996: - Summary: FlinkKafkaProducer010 doesn't guarantee at-least-once semantic Key: FLINK-6996 URL: https://issues.apache.org/jira/browse/FLINK-6996 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.3.1, 1.2.1, 1.3.0, 1.2.0 Reporter: Piotr Nowojski Assignee: Piotr Nowojski FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This means, when it's used like a "custom operator" (option b from [the java doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...
Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/3560 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061078#comment-16061078 ] Stefan Richter commented on FLINK-6964: --- Besides potential exceptions, the same logging information as you provided last time (e.g. interactions with the shared registry) could be helpful. > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061028#comment-16061028 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123755789 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo} +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.configuration.Configuration +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import java.util.{List => JList, ArrayList => JArrayList} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the aggregate in bounded rowtime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class RowTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + // the state which keeps all the events that are not expired. + // Each timestamp will contain an associated list with the events + // received at that timestamp + private var dataState: MapState[Long, JList[Row]] = _ + +// the state which keeps the last triggering timestamp to filter late events + private var lastTriggeringTsState: ValueState[Long] = _ + + private var outputC: CRow = _ + + override def open(config: Configuration) { + +val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] +val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row]( +inputRowType.asInstanceOf[CRowTypeInfo].rowType) + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( +"dataState", +keyTypeInformation, +valueTypeInformation) + +dataState = getRuntimeContext.getMapState(mapStateDescriptor) + +val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) +lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + +if (outputC == null) { + val arity:Integer = inputRowType.getArity + outputC = new CRow(Row.of(arity), true) +} + } + + + override def processElement( +inputC: CRow, +ctx: ProcessFunction[CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + +val input = inputC.row + +// triggering timestamp for
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061029#comment-16061029 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123717544 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.TableException +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule +extends ConverterRule( + classOf[FlinkLogicalSort], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + +val result = super.matches(call) + +//need to identify time between others order fields. Time needs to be first sort element +// we can safely convert the object if the match rule succeeded +if(result) { + val calcSort: FlinkLogicalSort = call.rel(0).asInstanceOf[FlinkLogicalSort] + checkTimeOrder(calcSort) +} + +result + } + + override def convert(rel: RelNode): RelNode = { +val calcSort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort] +val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) +val convInput: RelNode = RelOptRule.convert(calcSort.getInput(0), FlinkConventions.DATASTREAM) + +val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType + +new DataStreamSort( + rel.getCluster, + traitSet, + convInput, + new RowSchema(inputRowType), + new RowSchema(rel.getRowType), + calcSort.collation, + calcSort.offset, + calcSort.fetch, + description) + + } + + + /** + * Function is used to check at verification time if the SQL syntax is supported + */ + + def checkTimeOrder(calcSort: FlinkLogicalSort) = { + +val rowType = calcSort.getRowType +val sortCollation = calcSort.collation + //need to identify time between others order fields. Time needs to be first sort element +val timeType = SortUtil.getTimeType(sortCollation, rowType) +//time ordering needs to be ascending +if (SortUtil.getTimeDirection(sortCollation) != Direction.ASCENDING) { + throw new TableException("SQL/Table supports only ascending time ordering") --- End diff -- We do not exit the optimizer with exceptions because it prevents the optimizer to find an alternative plan. Rather return `false` in `matches()` > Support Limit/Top(Sort) for Stream SQL >
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061033#comment-16061033 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123759708 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.Comparator +import java.util.concurrent.ConcurrentLinkedQueue +import java.lang.{Integer => JInt, Long => JLong} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.table.runtime.aggregate.ProcTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.RowTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.TimeSortProcessFunctionTest._ +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.streaming.api.TimeCharacteristic + +class TimeSortProcessFunctionTest{ + + + @Test + def testSortProcTimeHarnessPartitioned(): Unit = { + +val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a","b","c","d","e")) + +val rTA = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO), Array("count")) +val indexes = Array(1,2) --- End diff -- space > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit,
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061015#comment-16061015 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123757465 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.junit.Test + +class SortTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, + 'proctime.proctime, 'rowtime.rowtime) + + @Test + def testSortProcessingTime() = { + +val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode("DataStreamSort", + streamTableNode(0), + term("orderBy", "proctime ASC", "c ASC"), + term("offset", "null"), + term("fetch","unlimited")), +term("select", "a", "TIME_MATERIALIZATION(proctime) AS proctime", "c")) --- End diff -- shouldn't this be just `a` since we do `SELECT a FROM ...` and not `SELECT a, proctime, c FROM ...`? > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of >
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061012#comment-16061012 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123745157 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, --- End diff -- Indention is off 3 instead of 2 > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL:
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061024#comment-16061024 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123753005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import java.lang.{Integer=>JInt} + +/** + * Process Function used for the aggregate in bounded proctime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class ProcTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + private var stateEventsBuffer: ListState[Row] = _ + private val sortArray: ArrayList[Row] = new ArrayList[Row] + + private var outputC: CRow = _ + + override def open(config: Configuration) { +val sortDescriptor = new ListStateDescriptor[Row]("sortState", +inputRowType.asInstanceOf[CRowTypeInfo].rowType) +stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor) + +if (outputC == null) { + val arity:Integer = inputRowType.getArity + outputC = new CRow(Row.of(arity), true) +} + + } + + override def processElement( +inputC: CRow, +ctx: ProcessFunction[CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + +val input = inputC.row + +val currentTime = ctx.timerService.currentProcessingTime +//buffer the event incoming event + +stateEventsBuffer.add(input) + +//deduplication of multiple registered timers is done automatically +ctx.timerService.registerProcessingTimeTimer(currentTime + 1) + + } + + override def onTimer( +timestamp: Long, +ctx: ProcessFunction[CRow, CRow]#OnTimerContext, +out: Collector[CRow]): Unit = { + +val iter = stateEventsBuffer.get.iterator() + +sortArray.clear() +while(iter.hasNext()) { + sortArray.add(iter.next()) +} + +//if we do not rely on java collections to do the sort we could implement --- End diff -- I think the comment can be removed. Collection sort should
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061017#comment-16061017 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123712745 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { +val offsetToString = s"$offset" +offsetToString + } + + + private[flink] def sortFieldsToString( --- End diff -- can be completely private if we move `explainTerms` to `CommonSort` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061026#comment-16061026 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123754095 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import java.lang.{Integer=>JInt} + +/** + * Process Function used for the aggregate in bounded proctime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class ProcTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + private var stateEventsBuffer: ListState[Row] = _ + private val sortArray: ArrayList[Row] = new ArrayList[Row] + + private var outputC: CRow = _ + + override def open(config: Configuration) { +val sortDescriptor = new ListStateDescriptor[Row]("sortState", +inputRowType.asInstanceOf[CRowTypeInfo].rowType) +stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor) + +if (outputC == null) { + val arity:Integer = inputRowType.getArity + outputC = new CRow(Row.of(arity), true) +} + + } + + override def processElement( +inputC: CRow, +ctx: ProcessFunction[CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + +val input = inputC.row + +val currentTime = ctx.timerService.currentProcessingTime +//buffer the event incoming event + +stateEventsBuffer.add(input) + +//deduplication of multiple registered timers is done automatically +ctx.timerService.registerProcessingTimeTimer(currentTime + 1) + + } + + override def onTimer( +timestamp: Long, +ctx: ProcessFunction[CRow, CRow]#OnTimerContext, +out: Collector[CRow]): Unit = { + +val iter = stateEventsBuffer.get.iterator() + +sortArray.clear() +while(iter.hasNext()) { + sortArray.add(iter.next()) +} + +//if we do not rely on java collections to do the sort we could implement +//an insertion sort as we get the elements from the state +
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061020#comment-16061020 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123743300 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory --- End diff -- Most IDEs support highlighting of unused imports and other common code style issues (invalid parameter documentation in Java/ScalaDocs, etc). > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b) with the ordering being applied on the b field. > `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `) > ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort > [ASC]|| > | |10:00:00 |(aaa, 11) | | | >| > | |10:05:00|(aab, 7) | | || > |10-11
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061031#comment-16061031 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123760084 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.Comparator +import java.util.concurrent.ConcurrentLinkedQueue +import java.lang.{Integer => JInt, Long => JLong} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.table.runtime.aggregate.ProcTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.RowTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.TimeSortProcessFunctionTest._ +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.streaming.api.TimeCharacteristic + +class TimeSortProcessFunctionTest{ + + + @Test + def testSortProcTimeHarnessPartitioned(): Unit = { + +val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a","b","c","d","e")) + +val rTA = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO), Array("count")) +val indexes = Array(1,2) + +val fieldComps = Array[TypeComparator[AnyRef]]( + LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]], + INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] ) +val booleanOrders = Array(true, false) + + +val rowComp = new RowComparator( + rT.getTotalFields, + indexes, + fieldComps, + new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons + booleanOrders) + +val collectionRowComparator = new CollectionRowComparator(rowComp) + +val inputCRowType = CRowTypeInfo(rT) + +val processFunction = new KeyedProcessOperator[Integer,CRow,CRow]( + new ProcTimeSortProcessFunction( +inputCRowType, +collectionRowComparator)) + + val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,CRow,CRow]( + processFunction, + new TupleRowSelector(0), + BasicTypeInfo.INT_TYPE_INFO) + + testHarness.open(); + + testHarness.setProcessingTime(3) + + // timestamp is ignored in processing time +testHarness.processElement(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001)) +testHarness.processElement(new StreamRecord(new CRow( +Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002)) +testHarness.processElement(new StreamRecord(new CRow( +
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061034#comment-16061034 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123745861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060992#comment-16060992 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123711129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { --- End diff -- IMO, this method can be removed. It simply calls `offset.toString()` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b)
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061003#comment-16061003 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123718029 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.TableException +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule +extends ConverterRule( + classOf[FlinkLogicalSort], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + +val result = super.matches(call) + +//need to identify time between others order fields. Time needs to be first sort element +// we can safely convert the object if the match rule succeeded +if(result) { + val calcSort: FlinkLogicalSort = call.rel(0).asInstanceOf[FlinkLogicalSort] + checkTimeOrder(calcSort) +} + +result + } + + override def convert(rel: RelNode): RelNode = { +val calcSort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort] +val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) +val convInput: RelNode = RelOptRule.convert(calcSort.getInput(0), FlinkConventions.DATASTREAM) + +val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType + +new DataStreamSort( + rel.getCluster, + traitSet, + convInput, + new RowSchema(inputRowType), + new RowSchema(rel.getRowType), + calcSort.collation, + calcSort.offset, + calcSort.fetch, + description) + + } + + + /** + * Function is used to check at verification time if the SQL syntax is supported + */ + + def checkTimeOrder(calcSort: FlinkLogicalSort) = { + +val rowType = calcSort.getRowType +val sortCollation = calcSort.collation + //need to identify time between others order fields. Time needs to be first sort element --- End diff -- indention > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061001#comment-16061001 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123742995 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) --- End diff -- for Strings and Arrays, `length` should be used instead of `size`. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061007#comment-16061007 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123742277 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory --- End diff -- There quite a few: `AggregateFunction`, `SqlTypeName`, `ArrayList`, `JList`, and 10 more or so > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b) with the ordering being applied on the b field. > `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `) > ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort > [ASC]|| > | |10:00:00 |(aaa, 11) | | | >| > | |10:05:00|(aab, 7) | | || > |10-11 |11:00:00 | | aab,aaa |aab,aaa |
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061032#comment-16061032 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123736450 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.sql.SqlAggFunction +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.calcite.rel.core.Sort +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil._ +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.CommonSort + +/** + * Flink RelNode which matches along with Sort Rule. + * + */ +class DataStreamSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +inputSchema: RowSchema, +schema: RowSchema, +sortCollation: RelCollation, +sortOffset: RexNode, +sortFetch: RexNode, +description: String) + extends SingleRel(cluster, traitSet, inputNode) + with CommonSort + with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamSort( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + sortCollation, + sortOffset, + sortFetch, + description) + } + + override def toString: String = { +s"Sort(by:
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061030#comment-16061030 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123760629 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.Comparator +import java.util.concurrent.ConcurrentLinkedQueue +import java.lang.{Integer => JInt, Long => JLong} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.table.runtime.aggregate.ProcTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.RowTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.TimeSortProcessFunctionTest._ +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.streaming.api.TimeCharacteristic + +class TimeSortProcessFunctionTest{ --- End diff -- Please add a similar test for the event-time sorter which sorts for multiple timestamps and validates that all rows preserve their timestamp. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060997#comment-16060997 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123716582 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.TableException +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule +extends ConverterRule( + classOf[FlinkLogicalSort], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + +val result = super.matches(call) --- End diff -- This call always returns `true`. So we can skip it. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort >
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060998#comment-16060998 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123742438 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060989#comment-16060989 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123709906 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink +import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.watermark.Watermark +import scala.collection.mutable +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction + +class SortITCase extends StreamingWithStateTestBase { + + @Test + def testEventTimeOrderBy(): Unit = { +val data = Seq( + Left((1500L, (1L, 15, "Hello"))), + Left((1600L, (1L, 16, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 3, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Left((2000L, (3L, 1, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 65, "Hello"))), + Left((6000L, (6L, 6, "Hello"))), + Left((6000L, (6L, 67, "Hello"))), + Left((6000L, (6L, -1, "Hello"))), + Left((6000L, (6L, 6, "Hello"))), + Right(7000L), + Left((9000L, (6L, 9, "Hello"))), + Left((8500L, (6L, 18, "Hello"))), + Left((9000L, (6L, 7, "Hello"))), + Right(1L), + Left((1L, (7L, 7, "Hello World"))), + Left((11000L, (7L, 77, "Hello World"))), + Left((11000L, (7L, 17, "Hello World"))), + Right(12000L), + Left((14000L, (7L, 18, "Hello World"))), + Right(14000L), + Left((15000L, (8L, 8, "Hello World"))), + Right(17000L), + Left((2L, (20L, 20, "Hello World"))), + Right(19000L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + +tEnv.registerTable("T1", t1) + +val sqlQuery = "SELECT b FROM T1 " + + "ORDER BY rowtime, b ASC "; + + +val result = tEnv.sql(sqlQuery).toDataStream[Row] --- End diff -- `toDataStream` is deprecated. Please use `toAppendStream` instead. > Support Limit/Top(Sort) for Stream
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061002#comment-16061002 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123744145 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] --- End diff -- You can change `createFieldComparators` to return an `Array[TypeComparator[AnyRef]]` and avoid the additional cast. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL:
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061027#comment-16061027 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123752704 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import java.lang.{Integer=>JInt} + +/** + * Process Function used for the aggregate in bounded proctime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class ProcTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + private var stateEventsBuffer: ListState[Row] = _ --- End diff -- rename to `bufferedEvents`? > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061021#comment-16061021 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123759690 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.Comparator +import java.util.concurrent.ConcurrentLinkedQueue +import java.lang.{Integer => JInt, Long => JLong} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.table.runtime.aggregate.ProcTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.RowTimeSortProcessFunction +import org.apache.flink.table.runtime.aggregate.TimeSortProcessFunctionTest._ +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.streaming.api.TimeCharacteristic + +class TimeSortProcessFunctionTest{ + + + @Test + def testSortProcTimeHarnessPartitioned(): Unit = { + +val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a","b","c","d","e")) --- End diff -- space after comma > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e.,
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061011#comment-16061011 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123713426 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { +val offsetToString = s"$offset" +offsetToString + } + + + private[flink] def sortFieldsToString( + collationSort: RelCollation, + rowRelDataType: RelDataType): String = { +val fieldCollations = collationSort.getFieldCollations.asScala +.map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + +val sortFieldsToString = fieldCollations + .map(col => s"${ +rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + +sortFieldsToString + } + + private[flink] def directionToOrder(direction: Direction) = { +direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + case _ => throw new IllegalArgumentException("Unsupported direction.") +} + } + + private[flink] def fetchToString(fetch: RexNode, offset: RexNode): String = { +val limitEnd = getFetchLimitEnd(fetch, offset) +val fetchToString = if (limitEnd == Long.MaxValue) { --- End diff -- No need to define `val fetchToString` if it is only returned. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061025#comment-16061025 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123757132 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.junit.Test + +class SortTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, + 'proctime.proctime, 'rowtime.rowtime) + + @Test + def testSortProcessingTime() = { + +val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode("DataStreamSort", + streamTableNode(0), + term("orderBy", "proctime ASC", "c ASC"), + term("offset", "null"), --- End diff -- I think we should hide `offset` and `fetch` if they are not defined. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060993#comment-16060993 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123717194 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.TableException +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule +extends ConverterRule( + classOf[FlinkLogicalSort], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + +val result = super.matches(call) + +//need to identify time between others order fields. Time needs to be first sort element +// we can safely convert the object if the match rule succeeded +if(result) { + val calcSort: FlinkLogicalSort = call.rel(0).asInstanceOf[FlinkLogicalSort] --- End diff -- rename to `sort`. It is not a `calc` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061000#comment-16061000 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123742381 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061022#comment-16061022 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123755279 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo} +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.configuration.Configuration +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import java.util.{List => JList, ArrayList => JArrayList} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the aggregate in bounded rowtime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class RowTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + // the state which keeps all the events that are not expired. + // Each timestamp will contain an associated list with the events + // received at that timestamp + private var dataState: MapState[Long, JList[Row]] = _ + +// the state which keeps the last triggering timestamp to filter late events + private var lastTriggeringTsState: ValueState[Long] = _ + + private var outputC: CRow = _ + + override def open(config: Configuration) { + +val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] +val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row]( +inputRowType.asInstanceOf[CRowTypeInfo].rowType) + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( +"dataState", +keyTypeInformation, +valueTypeInformation) + +dataState = getRuntimeContext.getMapState(mapStateDescriptor) + +val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) +lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + +if (outputC == null) { --- End diff -- Can be done as `outputC = new CRow()` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061019#comment-16061019 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123748629 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060999#comment-16060999 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123717968 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.TableException +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule +extends ConverterRule( + classOf[FlinkLogicalSort], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + +val result = super.matches(call) + +//need to identify time between others order fields. Time needs to be first sort element +// we can safely convert the object if the match rule succeeded +if(result) { + val calcSort: FlinkLogicalSort = call.rel(0).asInstanceOf[FlinkLogicalSort] + checkTimeOrder(calcSort) +} + +result + } + + override def convert(rel: RelNode): RelNode = { +val calcSort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort] +val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) +val convInput: RelNode = RelOptRule.convert(calcSort.getInput(0), FlinkConventions.DATASTREAM) + +val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType + +new DataStreamSort( + rel.getCluster, + traitSet, + convInput, + new RowSchema(inputRowType), + new RowSchema(rel.getRowType), + calcSort.collation, + calcSort.offset, + calcSort.fetch, + description) + + } + + + /** + * Function is used to check at verification time if the SQL syntax is supported + */ + + def checkTimeOrder(calcSort: FlinkLogicalSort) = { --- End diff -- `calcSort` -> `sort` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061018#comment-16061018 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123710581 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { +val offsetToString = s"$offset" +offsetToString + } + --- End diff -- rm new line > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060994#comment-16060994 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123712493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.sql.SqlAggFunction +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.calcite.rel.core.Sort +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil._ +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.CommonSort + +/** + * Flink RelNode which matches along with Sort Rule. + * + */ +class DataStreamSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +inputSchema: RowSchema, +schema: RowSchema, +sortCollation: RelCollation, +sortOffset: RexNode, +sortFetch: RexNode, +description: String) + extends SingleRel(cluster, traitSet, inputNode) + with CommonSort + with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamSort( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + sortCollation, + sortOffset, + sortFetch, + description) + } + + override def toString: String = { +s"Sort(by:
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061013#comment-16061013 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123739308 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} --- End diff -- please add a space after a comma > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061014#comment-16061014 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123749098 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060995#comment-16060995 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123713964 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.sql.SqlAggFunction +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.calcite.rel.core.Sort +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil._ +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.CommonSort + +/** + * Flink RelNode which matches along with Sort Rule. + * + */ +class DataStreamSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +inputSchema: RowSchema, +schema: RowSchema, +sortCollation: RelCollation, +sortOffset: RexNode, +sortFetch: RexNode, +description: String) + extends SingleRel(cluster, traitSet, inputNode) + with CommonSort + with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamSort( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + sortCollation, + sortOffset, + sortFetch, + description) + } + + override def toString: String = { +s"Sort(by:
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061023#comment-16061023 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123758098 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.sql.SqlAggFunction +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.calcite.rel.core.Sort +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil._ +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.CommonSort + +/** + * Flink RelNode which matches along with Sort Rule. + * + */ +class DataStreamSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +inputSchema: RowSchema, +schema: RowSchema, +sortCollation: RelCollation, +sortOffset: RexNode, +sortFetch: RexNode, +description: String) + extends SingleRel(cluster, traitSet, inputNode) + with CommonSort + with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamSort( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + sortCollation, + sortOffset, + sortFetch, + description) + } + + override def toString: String = { +s"Sort(by:
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060991#comment-16060991 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123712536 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala --- @@ -136,36 +130,21 @@ class DataSetSort( } } - private def directionToOrder(direction: Direction) = { -direction match { - case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING - case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING - case _ => throw new IllegalArgumentException("Unsupported direction.") -} - - } - private val fieldCollations = collations.getFieldCollations.asScala .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) - private val sortFieldsToString = fieldCollations -.map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") - - private val offsetToString = s"$offset" - - private val fetchToString = if (limitEnd == Long.MaxValue) { -"unlimited" - } else { -s"$limitEnd" + + override def toString: String = { +s"Sort(by: ($$sortFieldsToString(collations, getRowType))," + + " offset: $offsetToString(offset)," + + " fetch: $fetchToString(fetch, offset))" } - - override def toString: String = -s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)" - + + override def explainTerms(pw: RelWriter) : RelWriter = { super.explainTerms(pw) - .item("orderBy", sortFieldsToString) - .item("offset", offsetToString) - .item("fetch", fetchToString) + .item("orderBy", sortFieldsToString(collations, getRowType)) --- End diff -- Move the implementation to `CommonSort` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061009#comment-16061009 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123709798 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink +import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.watermark.Watermark +import scala.collection.mutable +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction + +class SortITCase extends StreamingWithStateTestBase { + + @Test + def testEventTimeOrderBy(): Unit = { +val data = Seq( + Left((1500L, (1L, 15, "Hello"))), + Left((1600L, (1L, 16, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 3, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Left((2000L, (3L, 1, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 65, "Hello"))), + Left((6000L, (6L, 6, "Hello"))), + Left((6000L, (6L, 67, "Hello"))), + Left((6000L, (6L, -1, "Hello"))), + Left((6000L, (6L, 6, "Hello"))), + Right(7000L), + Left((9000L, (6L, 9, "Hello"))), + Left((8500L, (6L, 18, "Hello"))), + Left((9000L, (6L, 7, "Hello"))), + Right(1L), + Left((1L, (7L, 7, "Hello World"))), + Left((11000L, (7L, 77, "Hello World"))), + Left((11000L, (7L, 17, "Hello World"))), + Right(12000L), + Left((14000L, (7L, 18, "Hello World"))), + Right(14000L), + Left((15000L, (8L, 8, "Hello World"))), + Right(17000L), + Left((2L, (20L, 20, "Hello World"))), + Right(19000L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + +tEnv.registerTable("T1", t1) + +val sqlQuery = "SELECT b FROM T1 " + + "ORDER BY rowtime, b ASC "; --- End diff -- rm `;` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL:
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061006#comment-16061006 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123745122 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, --- End diff -- This gives the field count of the logical row type, but we need the number of physical fields (`new RowSchema(inputType).physicalArity`) > Support Limit/Top(Sort) for Stream SQL >
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061008#comment-16061008 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123718523 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.TableException +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule +extends ConverterRule( + classOf[FlinkLogicalSort], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + +val result = super.matches(call) + +//need to identify time between others order fields. Time needs to be first sort element +// we can safely convert the object if the match rule succeeded +if(result) { + val calcSort: FlinkLogicalSort = call.rel(0).asInstanceOf[FlinkLogicalSort] + checkTimeOrder(calcSort) +} + +result + } + + override def convert(rel: RelNode): RelNode = { +val calcSort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort] +val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) +val convInput: RelNode = RelOptRule.convert(calcSort.getInput(0), FlinkConventions.DATASTREAM) + +val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType + +new DataStreamSort( + rel.getCluster, + traitSet, + convInput, + new RowSchema(inputRowType), + new RowSchema(rel.getRowType), + calcSort.collation, + calcSort.offset, + calcSort.fetch, + description) + + } + + + /** + * Function is used to check at verification time if the SQL syntax is supported + */ + + def checkTimeOrder(calcSort: FlinkLogicalSort) = { + +val rowType = calcSort.getRowType +val sortCollation = calcSort.collation + //need to identify time between others order fields. Time needs to be first sort element +val timeType = SortUtil.getTimeType(sortCollation, rowType) +//time ordering needs to be ascending +if (SortUtil.getTimeDirection(sortCollation) != Direction.ASCENDING) { + throw new TableException("SQL/Table supports only ascending time ordering") +} +//enable to extend for other types of aggregates that will not be implemented in a window +timeType match { +case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => ---
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061016#comment-16061016 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123743615 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061010#comment-16061010 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123754391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo} +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.configuration.Configuration +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import java.util.{List => JList, ArrayList => JArrayList} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the aggregate in bounded rowtime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class RowTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + // the state which keeps all the events that are not expired. + // Each timestamp will contain an associated list with the events + // received at that timestamp + private var dataState: MapState[Long, JList[Row]] = _ + +// the state which keeps the last triggering timestamp to filter late events --- End diff -- indention > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060990#comment-16060990 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123716234 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.sql.SqlAggFunction +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.calcite.rel.core.Sort +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil._ +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.CommonSort + +/** + * Flink RelNode which matches along with Sort Rule. + * + */ +class DataStreamSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +inputSchema: RowSchema, +schema: RowSchema, +sortCollation: RelCollation, +sortOffset: RexNode, +sortFetch: RexNode, +description: String) + extends SingleRel(cluster, traitSet, inputNode) --- End diff -- Could also extend `Sort` instead of `SingleRel` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061005#comment-16061005 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123713179 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { +val offsetToString = s"$offset" +offsetToString + } + + + private[flink] def sortFieldsToString( + collationSort: RelCollation, + rowRelDataType: RelDataType): String = { +val fieldCollations = collationSort.getFieldCollations.asScala +.map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + +val sortFieldsToString = fieldCollations --- End diff -- `sortFieldsToString` is only used as return value. In that case we can simply return the result of the expression without defining a `val`: ``` fieldCollations .map(col => s"${rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" ) .mkString(", ") ``` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060996#comment-16060996 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123713771 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { +val offsetToString = s"$offset" +offsetToString + } + + + private[flink] def sortFieldsToString( --- End diff -- This is probably true for most other methods in this class. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061004#comment-16061004 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123717004 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet } +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan } +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort +import org.apache.calcite.rel.RelCollation +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.TableException +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil + +/** + * Rule to convert a LogicalSort into a DataStreamSort. + */ +class DataStreamSortRule +extends ConverterRule( + classOf[FlinkLogicalSort], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamSortRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + +val result = super.matches(call) + +//need to identify time between others order fields. Time needs to be first sort element +// we can safely convert the object if the match rule succeeded +if(result) { --- End diff -- please add a space after `if`, `for`, `while`, etc. > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123755789 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo} +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.configuration.Configuration +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import java.util.{List => JList, ArrayList => JArrayList} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the aggregate in bounded rowtime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class RowTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + // the state which keeps all the events that are not expired. + // Each timestamp will contain an associated list with the events + // received at that timestamp + private var dataState: MapState[Long, JList[Row]] = _ + +// the state which keeps the last triggering timestamp to filter late events + private var lastTriggeringTsState: ValueState[Long] = _ + + private var outputC: CRow = _ + + override def open(config: Configuration) { + +val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] +val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row]( +inputRowType.asInstanceOf[CRowTypeInfo].rowType) + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( +"dataState", +keyTypeInformation, +valueTypeInformation) + +dataState = getRuntimeContext.getMapState(mapStateDescriptor) + +val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) +lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + +if (outputC == null) { + val arity:Integer = inputRowType.getArity + outputC = new CRow(Row.of(arity), true) +} + } + + + override def processElement( +inputC: CRow, +ctx: ProcessFunction[CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + +val input = inputC.row + +// triggering timestamp for trigger calculation +val rowtime = ctx.timestamp + +val lastTriggeringTs = lastTriggeringTsState.value + +// check if the data is expired, if not, save the data and register event time timer +if
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123742381 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new CollectionRowComparator(rowComp) + +val inputCRowType = CRowTypeInfo(inputTypeInfo) + +new RowTimeSortProcessFunction( + inputCRowType, + collectionRowComparator) + + } + +
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060988#comment-16060988 ] ASF GitHub Bot commented on FLINK-6075: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123713677 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { +val offsetToString = s"$offset" +offsetToString + } + + + private[flink] def sortFieldsToString( + collationSort: RelCollation, + rowRelDataType: RelDataType): String = { +val fieldCollations = collationSort.getFieldCollations.asScala +.map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + +val sortFieldsToString = fieldCollations + .map(col => s"${ +rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + +sortFieldsToString + } + + private[flink] def directionToOrder(direction: Direction) = { +direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + case _ => throw new IllegalArgumentException("Unsupported direction.") +} + } + + private[flink] def fetchToString(fetch: RexNode, offset: RexNode): String = { +val limitEnd = getFetchLimitEnd(fetch, offset) +val fetchToString = if (limitEnd == Long.MaxValue) { --- End diff -- same for the return values of `getFetchLimitEnd` and `getFetchLimitStart` > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123745861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new CollectionRowComparator(rowComp) + +val inputCRowType = CRowTypeInfo(inputTypeInfo) + +new RowTimeSortProcessFunction( + inputCRowType, + collectionRowComparator) + + } + +
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123713426 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.common.operators.Order + + +/** + * Trait represents a collection of sort methods to manipulate the parameters + */ + +trait CommonSort { + + private[flink] def offsetToString(offset: RexNode): String = { +val offsetToString = s"$offset" +offsetToString + } + + + private[flink] def sortFieldsToString( + collationSort: RelCollation, + rowRelDataType: RelDataType): String = { +val fieldCollations = collationSort.getFieldCollations.asScala +.map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + +val sortFieldsToString = fieldCollations + .map(col => s"${ +rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + +sortFieldsToString + } + + private[flink] def directionToOrder(direction: Direction) = { +direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + case _ => throw new IllegalArgumentException("Unsupported direction.") +} + } + + private[flink] def fetchToString(fetch: RexNode, offset: RexNode): String = { +val limitEnd = getFetchLimitEnd(fetch, offset) +val fetchToString = if (limitEnd == Long.MaxValue) { --- End diff -- No need to define `val fetchToString` if it is only returned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123745122 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, --- End diff -- This gives the field count of the logical row type, but we need the number of physical fields (`new RowSchema(inputType).physicalArity`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123749098 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, + keyIndexesNoTime, + fieldCompsRefs, + new Array[TypeSerializer[AnyRef]](0), //used only for object comparisons + booleanDirectionsNoTime) + +val collectionRowComparator = new CollectionRowComparator(rowComp) + +val inputCRowType = CRowTypeInfo(inputTypeInfo) + +new RowTimeSortProcessFunction( + inputCRowType, + collectionRowComparator) + + } + +
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123752704 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import java.lang.{Integer=>JInt} + +/** + * Process Function used for the aggregate in bounded proctime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param inputRowType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class ProcTimeSortProcessFunction( + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + private var stateEventsBuffer: ListState[Row] = _ --- End diff -- rename to `bufferedEvents`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r123745157 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.rel.RelCollation +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.table.api.TableException +import org.apache.calcite.sql.`type`.SqlTypeName._ +import java.util.{ List => JList, ArrayList } +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import java.sql.Timestamp +import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelFieldCollation.Direction +import java.util.Comparator +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat} +import java.math.{BigDecimal=>JBigDecimal} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.operators.Order +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.java.typeutils.runtime.RowComparator +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Class represents a collection of helper methods to build the sort logic. + * It encapsulates as well the implementation for ordering and generic interfaces + */ +object SortUtil { + + /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * elements based on rowtime and potentially other fields + * @param collationSort The Sort collation list + * @param inputType input row type + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunction( +collationSort: RelCollation, +inputType: RelDataType, +inputTypeInfo: TypeInformation[Row], +execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + +val keySortFields = getSortFieldIndexList(collationSort) +//drop time from comparison as we sort on time in the states and result emission +val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size) +val booleanOrderings = getSortFieldDirectionBooleanList(collationSort) +val booleanDirectionsNoTime = booleanOrderings.slice(1, booleanOrderings.size) + +val fieldComps = createFieldComparators( + inputType, + keyIndexesNoTime, + booleanDirectionsNoTime, + execCfg) +val fieldCompsRefs = fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]] + +val rowComp = new RowComparator( + inputType.getFieldCount, --- End diff -- Indention is off 3 instead of 2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---