[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes
[ https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929948#comment-15929948 ] Greg Hogan commented on FLINK-5754: --- [~shijinkui] it looks like the objective is to tag the release. Since the release is not a git repo the associated files (.gitigonore, .travis.yml, .gitattributes, etc.) are deleted. Why are you not working off the {{release-1.2}} branch or even {{release-1.2.0^}}? > released tag missing .gitigonore .travis.yml .gitattributes > > > Key: FLINK-5754 > URL: https://issues.apache.org/jira/browse/FLINK-5754 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > released tag missing .gitigonore .travis.yml .gitattributes. > When make a release version, should only replace the version. > for example: https://github.com/apache/spark/tree/v2.1.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929946#comment-15929946 ] shijinkui commented on FLINK-5650: -- Good job, Thanks. > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0, 1.3.0 >Reporter: shijinkui >Assignee: Chesnay Schepler >Priority: Critical > Labels: osx > Fix For: 1.3.0, 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3543: [FLINK-5985] [Backport for 1.2] Report no task states for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3543 Looks good. +1 for merging this! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929932#comment-15929932 ] ASF GitHub Bot commented on FLINK-5985: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3543 Looks good. +1 for merging this! > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r106644720 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -17,29 +17,51 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.api.java.typeutils.{Types => JTypes} /** * This class enumerates all supported types of the Table API. */ -object Types { +object Types extends JTypes { - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO + val STRING = JTypes.STRING + val BOOLEAN = JTypes.BOOLEAN - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO + val BYTE = JTypes.BYTE + val SHORT = JTypes.SHORT + val INT = JTypes.INT + val LONG = JTypes.LONG + val FLOAT = JTypes.FLOAT + val DOUBLE = JTypes.DOUBLE + val DECIMAL = JTypes.DECIMAL - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val SQL_DATE = JTypes.SQL_DATE + val SQL_TIME = JTypes.SQL_TIME + val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Generates RowTypeInfo with default names (f1, f2 ..). +* same as ``new RowTypeInfo(types)`` +* +* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT) +*/ + def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = { --- End diff -- Out of curiosity: Why do you need the manifest? I think you don't need it as you don't reference `m` anywhere... Also, I think the common way of doing would be: ```scala def ROW[T: Manifest](types: TypeInformation[_]*) = { ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3127 Found one more small concern (inline comment) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5481) Simplify Row creation
[ https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929928#comment-15929928 ] ASF GitHub Bot commented on FLINK-5481: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3127 Found one more small concern (inline comment) > Simplify Row creation > - > > Key: FLINK-5481 > URL: https://issues.apache.org/jira/browse/FLINK-5481 > Project: Flink > Issue Type: Bug > Components: DataSet API, Table API & SQL >Affects Versions: 1.2.0 >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Trivial > > When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first > element of {{data}} to define a type. If first Row in collection has wrong > number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but > GenericType -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5481) Simplify Row creation
[ https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929927#comment-15929927 ] ASF GitHub Bot commented on FLINK-5481: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r106644720 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -17,29 +17,51 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.api.java.typeutils.{Types => JTypes} /** * This class enumerates all supported types of the Table API. */ -object Types { +object Types extends JTypes { - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO + val STRING = JTypes.STRING + val BOOLEAN = JTypes.BOOLEAN - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO + val BYTE = JTypes.BYTE + val SHORT = JTypes.SHORT + val INT = JTypes.INT + val LONG = JTypes.LONG + val FLOAT = JTypes.FLOAT + val DOUBLE = JTypes.DOUBLE + val DECIMAL = JTypes.DECIMAL - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val SQL_DATE = JTypes.SQL_DATE + val SQL_TIME = JTypes.SQL_TIME + val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Generates RowTypeInfo with default names (f1, f2 ..). +* same as ``new RowTypeInfo(types)`` +* +* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT) +*/ + def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = { --- End diff -- Out of curiosity: Why do you need the manifest? I think you don't need it as you don't reference `m` anywhere... Also, I think the common way of doing would be: ```scala def ROW[T: Manifest](types: TypeInformation[_]*) = { ... ``` > Simplify Row creation > - > > Key: FLINK-5481 > URL: https://issues.apache.org/jira/browse/FLINK-5481 > Project: Flink > Issue Type: Bug > Components: DataSet API, Table API & SQL >Affects Versions: 1.2.0 >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Trivial > > When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first > element of {{data}} to define a type. If first Row in collection has wrong > number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but > GenericType -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5481) Simplify Row creation
[ https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929925#comment-15929925 ] ASF GitHub Bot commented on FLINK-5481: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3127 Looks good to me, +1 @twalthr @fhueske Any concerns about merging this? > Simplify Row creation > - > > Key: FLINK-5481 > URL: https://issues.apache.org/jira/browse/FLINK-5481 > Project: Flink > Issue Type: Bug > Components: DataSet API, Table API & SQL >Affects Versions: 1.2.0 >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Trivial > > When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first > element of {{data}} to define a type. If first Row in collection has wrong > number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but > GenericType -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3127 Looks good to me, +1 @twalthr @fhueske Any concerns about merging this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5668) passing taskmanager configuration through taskManagerEnv instead of file
[ https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929916#comment-15929916 ] Stephan Ewen commented on FLINK-5668: - There are other config files and config objects that are passes as well in addition to the configuration. For example, the new Yarn mode will not behave like "start cluster / submit job" but starts directly a jobgraph on yarn. The jobgraph also need to be persisted. > passing taskmanager configuration through taskManagerEnv instead of file > > > Key: FLINK-5668 > URL: https://issues.apache.org/jira/browse/FLINK-5668 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: Bill Liu > Original Estimate: 48h > Remaining Estimate: 48h > > When create a Flink cluster on Yarn, JobManager depends on HDFS to share > taskmanager-conf.yaml with TaskManager. > It's better to share the taskmanager-conf.yaml on JobManager Web server > instead of HDFS, which could reduce the HDFS dependency at job startup. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6098) Cassandra sink freezes after write error
[ https://issues.apache.org/jira/browse/FLINK-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929909#comment-15929909 ] Jakub Nowacki commented on FLINK-6098: -- Well, it doesn't. :) As you can see from the above logs, which are both for the same job, it kept running until I killed it the next day. It keeps refusing the checkpoint though, which is an indicator but it does not help much. On the source side, if Kafka fails the job gets, indeed, restarted. > Cassandra sink freezes after write error > > > Key: FLINK-6098 > URL: https://issues.apache.org/jira/browse/FLINK-6098 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.2.0 > Environment: Flink 1.2.0, standalone cluster, Debian GNU/Linux 8.7 > (jessie) >Reporter: Jakub Nowacki > > I am having problem with a very simple pipeline taking messages form Kafka > and writing them into Cassandra. The pipeline runs fine for a number of days > and at some point I am getting the below error in the taskmanager logs: > {code} > 2017-03-13 16:01:50,699 ERROR > org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase - Error > while sending value. > com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout > during write query at consistency LOCAL_ONE (1 replica were required but only > 0 acknowledged the write) > Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: > Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were > required but only 0 acknowledged the write) > {code} > The job seems to be running fine, but it does not process any messages, which > is only visible in the metrics and in the JobManager log: > {code} > 2017-03-14 14:00:44,611 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 42288 @ 1489496444610 > 2017-03-14 14:00:44,612 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 42288 because of checkpoint decline from task > 35926157acfb1b68d1f6c9abcd90c8b4 : Task Source: Custom Source -> Map -> Map > -> Sink: Cassandra Sink (1/2) was not running > {code} > I know this is some Cassandra hiccup, but in theory pipeline should recover > after a failure or fail and stop. > Everything seems fine and I didn't find any information specific to the > reconnect after failure for the Cassandra Connector. The only thing I'm not > sure if it's done correctly is the ClusterBuilder; i.e I use the below code > in the job definition main method (in Scala): > {code:java} > val cassandraServers = parameters.get("cassandra.servers", > "localhost").split(",") > val cassandraUser = parameters.get("cassandra.user") > val cassandraPassword = parameters.get("cassandra.password") > val clusterBuilder = new ClusterBuilder() { > override def buildCluster(builder: Cluster.Builder): Cluster = { > cassandraServers.foreach(builder.addContactPoint) > if (cassandraUser != null && cassandraPassword != null) > builder.withCredentials(cassandraUser, cassandraPassword) > builder.build() > } > } > {code} > The job starts correctly but I'm not sure if the configuration from the > properties is pulled correctly on the taskmanages, as I understand the > {{buildCluster}} call is done on its side. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106545012 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) --- End diff -- Check this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6098) Cassandra sink freezes after write error
[ https://issues.apache.org/jira/browse/FLINK-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929893#comment-15929893 ] Chesnay Schepler commented on FLINK-6098: - It is very odd that the job does not fail/restarts. The CassandraSink appears to be behaving as expected, it detected an issue and throws the exception. Or rather i assume it did, because there has to be some reason as to why the task is no longer running. The fact that the job doesn't fail, even though a task apparently failed, is a bit concerning. > Cassandra sink freezes after write error > > > Key: FLINK-6098 > URL: https://issues.apache.org/jira/browse/FLINK-6098 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.2.0 > Environment: Flink 1.2.0, standalone cluster, Debian GNU/Linux 8.7 > (jessie) >Reporter: Jakub Nowacki > > I am having problem with a very simple pipeline taking messages form Kafka > and writing them into Cassandra. The pipeline runs fine for a number of days > and at some point I am getting the below error in the taskmanager logs: > {code} > 2017-03-13 16:01:50,699 ERROR > org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase - Error > while sending value. > com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout > during write query at consistency LOCAL_ONE (1 replica were required but only > 0 acknowledged the write) > Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: > Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were > required but only 0 acknowledged the write) > {code} > The job seems to be running fine, but it does not process any messages, which > is only visible in the metrics and in the JobManager log: > {code} > 2017-03-14 14:00:44,611 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 42288 @ 1489496444610 > 2017-03-14 14:00:44,612 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 42288 because of checkpoint decline from task > 35926157acfb1b68d1f6c9abcd90c8b4 : Task Source: Custom Source -> Map -> Map > -> Sink: Cassandra Sink (1/2) was not running > {code} > I know this is some Cassandra hiccup, but in theory pipeline should recover > after a failure or fail and stop. > Everything seems fine and I didn't find any information specific to the > reconnect after failure for the Cassandra Connector. The only thing I'm not > sure if it's done correctly is the ClusterBuilder; i.e I use the below code > in the job definition main method (in Scala): > {code:java} > val cassandraServers = parameters.get("cassandra.servers", > "localhost").split(",") > val cassandraUser = parameters.get("cassandra.user") > val cassandraPassword = parameters.get("cassandra.password") > val clusterBuilder = new ClusterBuilder() { > override def buildCluster(builder: Cluster.Builder): Cluster = { > cassandraServers.foreach(builder.addContactPoint) > if (cassandraUser != null && cassandraPassword != null) > builder.withCredentials(cassandraUser, cassandraPassword) > builder.build() > } > } > {code} > The job starts correctly but I'm not sure if the configuration from the > properties is pulled correctly on the taskmanages, as I understand the > {{buildCluster}} call is done on its side. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6101) GroupBy fields with expression can not be selected either using original name or expression
[ https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-6101: --- Description: currently the TableAPI do not support selecting GroupBy fields with expression either using original field name or the expression {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} caused {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} and {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) {code} will cause {code} org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. {code} and add an alias "group(e, 'b%3 as 'b)" still doesn't work {code} java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, b5, TMP_0, TMP_1, TMP_2] {code} the only way to get this work can be {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .select('a, 'b%3 as 'b, 'c, 'd, 'e) .groupBy('e, 'b) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} I'm confused, should we add support alias in groupBy clause? ( it seems a bit odd against SQL, but TableAPI has a different groupBy grammar ) What do you think? was: currently the TableAPI do not support selecting GroupBy fields with expression either using original field name or the expression {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} caused {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} and {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) {code} will cause {code} org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. {code} and add an alias "group(e, 'b%3 as 'b)" still doesn't work {code} java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, b5, TMP_0, TMP_1, TMP_2] {code} the only way to get this work can be {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .select('a, 'b%3 as 'b, 'c, 'd, 'e) .groupBy('e, 'b) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} I'm confused, should we add support alias in groupBy clause? ( it seems a bit odd against SQL, but TableAPI has a different groupBy grammar ) > GroupBy fields with expression can not be selected either using original name > or expression > > > Key: FLINK-6101 > URL: https://issues.apache.org/jira/browse/FLINK-6101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee > > currently the TableAPI do not support selecting GroupBy fields with > expression either using original field name or the expression > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > caused > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > and > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) > {code} > will cause > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > and add an alias "group(e, 'b%3 as 'b)" still doesn't work > {code} > java.lang.IllegalArgumentException: field [b] not found; input fields are: > [e, b5, TMP_0, TMP_1, TMP_2] > {code} > the only way to get this work can be > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .select('a, 'b%3 as 'b, 'c, 'd, 'e) > .groupBy('e, 'b) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > I'm confused, should we add support alias in groupBy clause? ( it seems a bit > odd against SQL, but TableAPI has a different groupBy grammar ) > What do you think? --
[jira] [Updated] (FLINK-6101) GroupBy fields with expression can not be selected either using original name or expression
[ https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-6101: --- Description: currently the TableAPI do not support selecting GroupBy fields with expression either using original field name or the expression {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} caused {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} and {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) {code} will cause {code} org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. {code} and add an alias "group(e, 'b%3 as 'b)" still doesn't work {code} java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, b5, TMP_0, TMP_1, TMP_2] {code} the only way to get this work can be {code} val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .select('a, 'b%3 as 'b, 'c, 'd, 'e) .groupBy('e, 'b) .select('b, 'c.min, 'e, 'a.avg, 'd.count) {code} I'm confused, should we add support alias in groupBy clause? ( it seems a bit odd against SQL, but TableAPI has a different groupBy grammar ) was: currently the TableAPI do not support selecting GroupBy fields with expression either using original field name or the expression ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) ``` cause ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) ``` and ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) ``` will cause ``` org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. ``` and add an alias "group(e, 'b%3 as 'b)" still doesn't work ``` java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, b5, TMP_0, TMP_1, TMP_2] ``` the only way to get this work can be ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .select('a, 'b%3 as 'b, 'c, 'd, 'e) .groupBy('e, 'b) .select('b, 'c.min, 'e, 'a.avg, 'd.count) ``` I'm confused, should we add support alias in groupBy clause? ( it seems a bit odd against SQL, but TableAPI has a different groupBy grammar ) > GroupBy fields with expression can not be selected either using original name > or expression > > > Key: FLINK-6101 > URL: https://issues.apache.org/jira/browse/FLINK-6101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee > > currently the TableAPI do not support selecting GroupBy fields with > expression either using original field name or the expression > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > caused > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > and > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) > {code} > will cause > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > and add an alias "group(e, 'b%3 as 'b)" still doesn't work > {code} > java.lang.IllegalArgumentException: field [b] not found; input fields are: > [e, b5, TMP_0, TMP_1, TMP_2] > {code} > the only way to get this work can be > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .select('a, 'b%3 as 'b, 'c, 'd, 'e) > .groupBy('e, 'b) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > I'm confused, should we add support alias in groupBy clause? ( it seems a bit > odd against SQL, but TableAPI has a different groupBy grammar ) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...
Github user tonycox closed the pull request at: https://github.com/apache/flink/pull/3166 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3392: [FLINK-5883] Re-adding the Exception-thrown code for List...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3392 +1 Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929815#comment-15929815 ] ASF GitHub Bot commented on FLINK-3849: --- Github user tonycox closed the pull request at: https://github.com/apache/flink/pull/3166 > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > Fix For: 1.3.0 > > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106548270 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{ TableEnvironment, TableException } +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{ + StreamingWithStateTestBase, + StreamITCase, + StreamTestData +} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import scala.collection.mutable + +class DataStreamProcTimeBoundAggIntegrationTest extends StreamingWithStateTestBase { --- End diff -- Please see the comments on PR #3547 regarding the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630745 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.Accumulator + +/** + * Computes the final aggregate value from incrementally computed aggreagtes. + * + * @param numGroupingKey The number of grouping keys. + * @param numAggregates The number of aggregates. + * @param finalRowArity The arity of the final output row. + */ +class DataStreamIncrementalAggregateWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichWindowFunction[Row, Row, Tuple, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 --- End diff -- We can do that in `open()` because it is just called once. However, in the hot path we should use `while` loops because Scala `for` loops have significant overhead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106546899 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -191,3 +287,31 @@ class DataStreamOverAggregate( } +object DataStreamProcTimeCase { + class ProcTimeTimestampExtractor + extends AssignerWithPunctuatedWatermarks[Row] { + +override def checkAndGetNextWatermark( + lastElement: Row, + extractedTimestamp: Long): Watermark = { + null +} + +override def extractTimestamp( + element: Row, + previousElementTimestamp: Long): Long = { + System.currentTimeMillis() --- End diff -- `System.currentTimeMillis()` is not not strictly increasing. For instance the time of a machine can be synced by a timeserver. So we should always remember the last emitted timestamp and return the `max` of the last and the current time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106545668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) + .getValue2.asInstanceOf[java.math.BigDecimal].longValue() + + +val (aggFields, aggregates) = AggregateUtil.transformToAggregateFunctions( + namedAggregates.map(_.getKey),inputType, needRetraction = false) + + +// As we it is not possible to operate neither on sliding count neither +// on sliding time we need to manage the eviction of the events that +// expire ourselves based on the proctime (system time). Therefore the +// current system time is assign as the timestamp of the event to be +// recognize by the evictor + +val inputDataStreamTimed = inputDS --- End diff -- We should not assign processing timestamps as event timestamps because it overrides existing timestamps. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106544854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -106,6 +137,10 @@ class DataStreamOverAggregate( if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) +} else if (overWindow.lowerBound.getOffset.getType.isInstanceOf[IntervalSqlType] --- End diff -- We can explicitly check if a window is a ROW or a RANGE window by `overWindow.isRows`. I'm also not sure about the `overWindow.upperBound.isPreceding` condition. I think the condition rather be like this: ``` else if (overWindow.lowerBound.isPreceding() && !overWindow.lowerBound.isUnbounded() && // bounded preceding overWindow.upperBound.isCurrentRow() && // until current row !overWindow.isRows) // is RANGE window ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630792 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.Accumulator + +/** + * Computes the final aggregate value from incrementally computed aggreagtes. + * + * @param aggregates The aggregates to be computed + * @param aggFields the fields on which to apply the aggregate. + * @param forwardedFieldCount The fields to be carried from current row. + */ +class DataStreamProcTimeAggregateWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichWindowFunction[Row, Row, Tuple, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + + /** +* Calculate aggregated values output by aggregate buffer, and set them into output +* Row based on the mapping relation between intermediate aggregate data and output data. +*/ + override def apply( + key: Tuple, + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { + aggregates(i).resetAccumulator(accumulators.getField(i).asInstanceOf[Accumulator]) + i += 1 + } + var reuse:Row = null --- End diff -- +space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106546112 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) + .getValue2.asInstanceOf[java.math.BigDecimal].longValue() + + +val (aggFields, aggregates) = AggregateUtil.transformToAggregateFunctions( + namedAggregates.map(_.getKey),inputType, needRetraction = false) + + +// As we it is not possible to operate neither on sliding count neither +// on sliding time we need to manage the eviction of the events that +// expire ourselves based on the proctime (system time). Therefore the +// current system time is assign as the timestamp of the event to be +// recognize by the evictor + +val inputDataStreamTimed = inputDS + .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor()) + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + if (partitionKeys.nonEmpty) { +inputDataStreamTimed.keyBy(partitionKeys:_*) + .window(GlobalWindows.create()) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) + .apply(new DataStreamIncrementalAggregateWindowFunction[GlobalWindow] --- End diff -- Let's use a process function. We have to change the code anyway once we want to support `FOLLOWING`. Also a `ProcessFunction` does not need to aggregate all rows for each row but remember the accumulators of the last row, add the new row and retract the old ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630541 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -785,7 +785,7 @@ object AggregateUtil { (propPos._1, propPos._2) } - private def transformToAggregateFunctions( --- End diff -- We could also move the logic to create the processing function into `AggreagteUtil` like all other operators that have to deal with aggregations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106629913 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,66 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +var time_boundary = 0L + +logicWindow.constants.get(lowerboundIndex).getValue2 match { + case _: java.math.BigDecimal => time_boundary = logicWindow.constants.get(lowerboundIndex) + .getValue2.asInstanceOf[java.math.BigDecimal].longValue() + case _ => throw new TableException("OVER Window boundaries must be numeric") +} + +val (aggFields, aggregates) = AggregateUtil.transformToAggregateFunctions( + namedAggregates.map(_.getKey),inputType, needRetraction = false) + + +// As we it is not possible to operate neither on sliding count neither +// on sliding time we need to manage the eviction of the events that +// expire ourselves based on the proctime (system time). Therefore the +// current system time is assign as the timestamp of the event to be +// recognize by the evictor + +val inputDataStreamTimed = inputDS + .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor()) + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + if (partitionKeys.nonEmpty) { +inputDataStreamTimed.keyBy(partitionKeys:_*) + .window(GlobalWindows.create()) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) + .apply(new DataStreamProcTimeAggregateWindowFunction[GlobalWindow] + (aggregates,aggFields,inputType.getFieldCount)) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + + } else { + inputDataStreamTimed.windowAll(GlobalWindows.create()).trigger(CountTrigger.of(1)) +.evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) +.apply(new DataStreamProcTimeAggregateGlobalWindowFunction[GlobalWindow]( --- End diff -- I think a `ProcessFunction` would be more efficient for this use case. With a `WindowFunction` and a `GlobalWindow`, we have to aggregate all rows for each emitted row. With a `ProcessFunction` we can just add the new row and retract those rows which are not included anymore. Moreover, we have to change the logic to a `ProcessFunction` anyway when we extend this to support `FOLLOWING` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106546529 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -191,3 +287,31 @@ class DataStreamOverAggregate( } +object DataStreamProcTimeCase { + class ProcTimeTimestampExtractor --- End diff -- We should not use a `TimestampExtractor` to assign processing time timestamps as event timestamps. I think the `ProcTimeTimestampExtractor` does not need to be wrapped in an `object`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106547382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + +class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichAllWindowFunction[Row, Row, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + override def apply( + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) --- End diff -- Actually, accumulators can be created once in `open()` and be reused after the have been reset with `AggregateFunction.resetAccumulator()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106631183 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -17,23 +17,54 @@ */ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import java.util.{ List => JList } --- End diff -- Can you revert the `import` changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106547467 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + +class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichAllWindowFunction[Row, Row, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + override def apply( + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i += 1 + } + var reuse:Row = null --- End diff -- `var reuse: Row` +space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630898 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationITCase.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{ TableEnvironment, TableException } +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{ + StreamingWithStateTestBase, + StreamITCase, + StreamTestData +} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import scala.collection.mutable + +class DataStreamProcTimeBoundAggIntegrationITCase extends StreamingWithStateTestBase { --- End diff -- Please see the comment on PR #3547 regarding the tests. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5883) Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time
[ https://issues.apache.org/jira/browse/FLINK-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929811#comment-15929811 ] ASF GitHub Bot commented on FLINK-5883: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3392 +1 Merging this... > Re-adding the Exception-thrown code for ListKeyGroupedIterator when the > iterator is requested the second time > - > > Key: FLINK-5883 > URL: https://issues.apache.org/jira/browse/FLINK-5883 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Reporter: lincoln.lee >Assignee: lincoln.lee > > Originally, ListKeyGroupedIterator ensured that a TraversableOnceException > was thrown when the iterator is requested the second time within FLINK-1023, > it was lost from FLINK-1110 unexpectedly, so add it back. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929795#comment-15929795 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630745 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.Accumulator + +/** + * Computes the final aggregate value from incrementally computed aggreagtes. + * + * @param numGroupingKey The number of grouping keys. + * @param numAggregates The number of aggregates. + * @param finalRowArity The arity of the final output row. + */ +class DataStreamIncrementalAggregateWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichWindowFunction[Row, Row, Tuple, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 --- End diff -- We can do that in `open()` because it is just called once. However, in the hot path we should use `while` loops because Scala `for` loops have significant overhead. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929800#comment-15929800 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630792 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.Accumulator + +/** + * Computes the final aggregate value from incrementally computed aggreagtes. + * + * @param aggregates The aggregates to be computed + * @param aggFields the fields on which to apply the aggregate. + * @param forwardedFieldCount The fields to be carried from current row. + */ +class DataStreamProcTimeAggregateWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichWindowFunction[Row, Row, Tuple, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + + /** +* Calculate aggregated values output by aggregate buffer, and set them into output +* Row based on the mapping relation between intermediate aggregate data and output data. +*/ + override def apply( + key: Tuple, + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { + aggregates(i).resetAccumulator(accumulators.getField(i).asInstanceOf[Accumulator]) + i += 1 + } + var reuse:Row = null --- End diff -- +space > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single >
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929806#comment-15929806 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106547467 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + +class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichAllWindowFunction[Row, Row, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + override def apply( + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i += 1 + } + var reuse:Row = null --- End diff -- `var reuse: Row` +space > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929804#comment-15929804 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106544854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -106,6 +137,10 @@ class DataStreamOverAggregate( if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) +} else if (overWindow.lowerBound.getOffset.getType.isInstanceOf[IntervalSqlType] --- End diff -- We can explicitly check if a window is a ROW or a RANGE window by `overWindow.isRows`. I'm also not sure about the `overWindow.upperBound.isPreceding` condition. I think the condition rather be like this: ``` else if (overWindow.lowerBound.isPreceding() && !overWindow.lowerBound.isUnbounded() && // bounded preceding overWindow.upperBound.isCurrentRow() && // until current row !overWindow.isRows) // is RANGE window ``` > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929791#comment-15929791 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106547382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + +class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichAllWindowFunction[Row, Row, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + override def apply( + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) --- End diff -- Actually, accumulators can be created once in `open()` and be reused after the have been reset with `AggregateFunction.resetAccumulator()` > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929803#comment-15929803 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106629136 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,66 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +var time_boundary = 0L + +logicWindow.constants.get(lowerboundIndex).getValue2 match { --- End diff -- This can be changed to ``` val timeBoundary = logicWindow.constants.get(lowerboundIndex).getValue2 match { case _: java.math.BigDecimal => logicWindow.constants.get(lowerboundIndex) .getValue2.asInstanceOf[java.math.BigDecimal].longValue() case _ => throw new TableException("OVER Window boundaries must be numeric") } ``` > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929799#comment-15929799 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106548270 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{ TableEnvironment, TableException } +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{ + StreamingWithStateTestBase, + StreamITCase, + StreamTestData +} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import scala.collection.mutable + +class DataStreamProcTimeBoundAggIntegrationTest extends StreamingWithStateTestBase { --- End diff -- Please see the comments on PR #3547 regarding the tests. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929801#comment-15929801 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106631183 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -17,23 +17,54 @@ */ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import java.util.{ List => JList } --- End diff -- Can you revert the `import` changes? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929792#comment-15929792 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106545668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) + .getValue2.asInstanceOf[java.math.BigDecimal].longValue() + + +val (aggFields, aggregates) = AggregateUtil.transformToAggregateFunctions( + namedAggregates.map(_.getKey),inputType, needRetraction = false) + + +// As we it is not possible to operate neither on sliding count neither +// on sliding time we need to manage the eviction of the events that +// expire ourselves based on the proctime (system time). Therefore the +// current system time is assign as the timestamp of the event to be +// recognize by the evictor + +val inputDataStreamTimed = inputDS --- End diff -- We should not assign processing timestamps as event timestamps because it overrides existing timestamps. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929802#comment-15929802 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106545012 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) --- End diff -- Check this > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929794#comment-15929794 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630898 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationITCase.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{ TableEnvironment, TableException } +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{ + StreamingWithStateTestBase, + StreamITCase, + StreamTestData +} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import scala.collection.mutable + +class DataStreamProcTimeBoundAggIntegrationITCase extends StreamingWithStateTestBase { --- End diff -- Please see the comment on PR #3547 regarding the tests. Thanks > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929805#comment-15929805 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106546112 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) + .getValue2.asInstanceOf[java.math.BigDecimal].longValue() + + +val (aggFields, aggregates) = AggregateUtil.transformToAggregateFunctions( + namedAggregates.map(_.getKey),inputType, needRetraction = false) + + +// As we it is not possible to operate neither on sliding count neither +// on sliding time we need to manage the eviction of the events that +// expire ourselves based on the proctime (system time). Therefore the +// current system time is assign as the timestamp of the event to be +// recognize by the evictor + +val inputDataStreamTimed = inputDS + .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor()) + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + if (partitionKeys.nonEmpty) { +inputDataStreamTimed.keyBy(partitionKeys:_*) + .window(GlobalWindows.create()) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) + .apply(new DataStreamIncrementalAggregateWindowFunction[GlobalWindow] --- End diff -- Let's use a process function. We have to change the code anyway once we want to support `FOLLOWING`. Also a `ProcessFunction` does not need to aggregate all rows for each row but remember the accumulators of the last row, add the new row and retract the old ones. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929793#comment-15929793 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106630541 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -785,7 +785,7 @@ object AggregateUtil { (propPos._1, propPos._2) } - private def transformToAggregateFunctions( --- End diff -- We could also move the logic to create the processing function into `AggreagteUtil` like all other operators that have to deal with aggregations. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929797#comment-15929797 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106629913 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,66 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +var time_boundary = 0L + +logicWindow.constants.get(lowerboundIndex).getValue2 match { + case _: java.math.BigDecimal => time_boundary = logicWindow.constants.get(lowerboundIndex) + .getValue2.asInstanceOf[java.math.BigDecimal].longValue() + case _ => throw new TableException("OVER Window boundaries must be numeric") +} + +val (aggFields, aggregates) = AggregateUtil.transformToAggregateFunctions( + namedAggregates.map(_.getKey),inputType, needRetraction = false) + + +// As we it is not possible to operate neither on sliding count neither +// on sliding time we need to manage the eviction of the events that +// expire ourselves based on the proctime (system time). Therefore the +// current system time is assign as the timestamp of the event to be +// recognize by the evictor + +val inputDataStreamTimed = inputDS + .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor()) + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + if (partitionKeys.nonEmpty) { +inputDataStreamTimed.keyBy(partitionKeys:_*) + .window(GlobalWindows.create()) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) + .apply(new DataStreamProcTimeAggregateWindowFunction[GlobalWindow] + (aggregates,aggFields,inputType.getFieldCount)) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + + } else { + inputDataStreamTimed.windowAll(GlobalWindows.create()).trigger(CountTrigger.of(1)) +.evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) +.apply(new DataStreamProcTimeAggregateGlobalWindowFunction[GlobalWindow]( --- End diff -- I think a `ProcessFunction` would be more efficient for this use case. With a `WindowFunction` and a `GlobalWindow`, we have to aggregate all rows for each emitted row. With a `ProcessFunction` we can just add the new row and retract those rows which are not included anymore. Moreover, we have to change the logic to a `ProcessFunction` anyway when we extend this to support `FOLLOWING` > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929796#comment-15929796 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106546899 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -191,3 +287,31 @@ class DataStreamOverAggregate( } +object DataStreamProcTimeCase { + class ProcTimeTimestampExtractor + extends AssignerWithPunctuatedWatermarks[Row] { + +override def checkAndGetNextWatermark( + lastElement: Row, + extractedTimestamp: Long): Watermark = { + null +} + +override def extractTimestamp( + element: Row, + previousElementTimestamp: Long): Long = { + System.currentTimeMillis() --- End diff -- `System.currentTimeMillis()` is not not strictly increasing. For instance the time of a machine can be synced by a timeserver. So we should always remember the last emitted timestamp and return the `max` of the last and the current time. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929798#comment-15929798 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106546529 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -191,3 +287,31 @@ class DataStreamOverAggregate( } +object DataStreamProcTimeCase { + class ProcTimeTimestampExtractor --- End diff -- We should not use a `TimestampExtractor` to assign processing time timestamps as event timestamps. I think the `ProcTimeTimestampExtractor` does not need to be wrapped in an `object`. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929776#comment-15929776 ] ASF GitHub Bot commented on FLINK-5995: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3503 Looks good, merging this... > Get a Exception when creating the ListStateDescriptor with a TypeInformation > - > > Key: FLINK-5995 > URL: https://issues.apache.org/jira/browse/FLINK-5995 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > > When use OperatorState and creating the ListStateDescriptor with a > TypeInformation,I got a exception. The Exception info is: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Serializer not yet initialized. > at > org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169) > at > org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91) > at > org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670) > at java.lang.Thread.run(Thread.java:745) > {code} > So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the > `getOperatorState` method. I appreciate If anyone can give me some advice? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3503: [FLINK-5995][checkpoints] fix Get a Exception when creati...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3503 Looks good, merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured
[ https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929775#comment-15929775 ] ASF GitHub Bot commented on FLINK-5981: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3486 > SSL version and ciper suites cannot be constrained as configured > > > Key: FLINK-5981 > URL: https://issues.apache.org/jira/browse/FLINK-5981 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Tao Wang >Assignee: Tao Wang > Fix For: 1.3.0 > > > I configured ssl and start flink job, but found configured properties cannot > apply properly: > akka port: only ciper suites apply right, ssl version not > blob server/netty server: both ssl version and ciper suites are not like what > I configured > I've found out the reason why: > http://stackoverflow.com/questions/11504173/sslcontext-initialization (for > blob server and netty server) > https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl > version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078) > I'll fix the issue on blob server and netty server, and it seems like only > upgrade for akka can solve issue in akka side(we'll consider later as upgrade > is not a small action). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured
[ https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5981. - Resolution: Fixed Fix Version/s: 1.3.0 Fixed via e0614f6551a232706b74963563694486fe2461b1 > SSL version and ciper suites cannot be constrained as configured > > > Key: FLINK-5981 > URL: https://issues.apache.org/jira/browse/FLINK-5981 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Tao Wang >Assignee: Tao Wang > Fix For: 1.3.0 > > > I configured ssl and start flink job, but found configured properties cannot > apply properly: > akka port: only ciper suites apply right, ssl version not > blob server/netty server: both ssl version and ciper suites are not like what > I configured > I've found out the reason why: > http://stackoverflow.com/questions/11504173/sslcontext-initialization (for > blob server and netty server) > https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl > version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078) > I'll fix the issue on blob server and netty server, and it seems like only > upgrade for akka can solve issue in akka side(we'll consider later as upgrade > is not a small action). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured
[ https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5981. --- > SSL version and ciper suites cannot be constrained as configured > > > Key: FLINK-5981 > URL: https://issues.apache.org/jira/browse/FLINK-5981 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Tao Wang >Assignee: Tao Wang > Fix For: 1.3.0 > > > I configured ssl and start flink job, but found configured properties cannot > apply properly: > akka port: only ciper suites apply right, ssl version not > blob server/netty server: both ssl version and ciper suites are not like what > I configured > I've found out the reason why: > http://stackoverflow.com/questions/11504173/sslcontext-initialization (for > blob server and netty server) > https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl > version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078) > I'll fix the issue on blob server and netty server, and it seems like only > upgrade for akka can solve issue in akka side(we'll consider later as upgrade > is not a small action). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3486: [FLINK-5981][SECURITY]make ssl version and cipher ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3486 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929768#comment-15929768 ] ASF GitHub Bot commented on FLINK-5991: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 @StefanRRichter I think I was faster ... > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 @StefanRRichter I think I was faster ... ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6102) Update protobuf to latest version
Su Ralph created FLINK-6102: --- Summary: Update protobuf to latest version Key: FLINK-6102 URL: https://issues.apache.org/jira/browse/FLINK-6102 Project: Flink Issue Type: Task Components: Core Affects Versions: 1.2.0 Reporter: Su Ralph In flink release 1.2.0, we have protobuf-java as 2.5.0, and it's packaged into flink fat jar. This would cause conflict when an user application use new version of protobuf-java, it make more sense to update to later protobuf-java. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 I wonder if there could also exist a case for broadcasting operator state (non-keyed), where only one operator instance is selected as sender and all others receive on restore. Furthermore, the union aspect may (or may not) happen at restore time, but not at the time that a user requests this state. For what this currently does, I think `ReplicatingState` describes it pretty well. Broadcast would be a good description from the operator's perspective: it broadcasts the generated data to all peers on restore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929756#comment-15929756 ] ASF GitHub Bot commented on FLINK-5991: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 I wonder if there could also exist a case for broadcasting operator state (non-keyed), where only one operator instance is selected as sender and all others receive on restore. Furthermore, the union aspect may (or may not) happen at restore time, but not at the time that a user requests this state. For what this currently does, I think `ReplicatingState` describes it pretty well. Broadcast would be a good description from the operator's perspective: it broadcasts the generated data to all peers on restore. > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929753#comment-15929753 ] ASF GitHub Bot commented on FLINK-5653: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106627390 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -17,34 +17,41 @@ */ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } --- End diff -- No worries. I know that IDEs tend to reformat code but it really makes reviews harder. Thanks! > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106627390 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -17,34 +17,41 @@ */ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } --- End diff -- No worries. I know that IDEs tend to reformat code but it really makes reviews harder. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929751#comment-15929751 ] ASF GitHub Bot commented on FLINK-5653: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106627282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -106,9 +113,14 @@ class DataStreamOverAggregate( if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) +} // lowerbound is a BasicType and upperbound is PRECEEDING or CURRENT ROW +else if (overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType] --- End diff -- Yes, I realized that when looking at PR #3550 that `isInstanceOf[BasicSqlType]` and `.isInstanceOf[IntervalSqlType]` distinguishes ROW from RANGE windows. I think using `.isRows()` is more clear and might also be safer because it appears to be a more public API than the type of the offset. > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106627282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -106,9 +113,14 @@ class DataStreamOverAggregate( if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) +} // lowerbound is a BasicType and upperbound is PRECEEDING or CURRENT ROW +else if (overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType] --- End diff -- Yes, I realized that when looking at PR #3550 that `isInstanceOf[BasicSqlType]` and `.isInstanceOf[IntervalSqlType]` distinguishes ROW from RANGE windows. I think using `.isRows()` is more clear and might also be safer because it appears to be a more public API than the type of the offset. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929740#comment-15929740 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106621564 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,35 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType): UnboundedEventTimeOverProcessFunction = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) + +val aggregationStateType: RowTypeInfo = --- End diff -- Also you can use `createAccumulatorRowType(inputType, aggregates)`. Btw. could you refactor the `createAccumulatorRowType(inputType, aggregates)` method and remove the `inputType` parameter? It is not used. Thanks! > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929742#comment-15929742 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106620612 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -159,6 +167,46 @@ class DataStreamOverAggregate( result } + def createUnboundedAndCurrentRowEventTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val keyedStream = inputDS.keyBy(partitionKeys: _*) +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( + namedAggregates, + inputType) + +keyedStream + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + // global non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( + namedAggregates, + inputType) + +inputDS.keyBy(new NullByteKeySelector[Row]) + .process(processFunction) + .setParallelism(1) --- End diff -- also `setMaxParallelism(1)` to prevent that this operator can be scaled out. > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929738#comment-15929738 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106622479 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929741#comment-15929741 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623704 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929745#comment-15929745 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623012 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929746#comment-15929746 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106622821 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929733#comment-15929733 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106625858 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(a) over (" + --- End diff -- Also, most groups have just a single record. The max is two records. With that we cannot really check if the sorting works correctly. Can you make less groups (less distinct `a` values) and add more rows for some groups with out-of-order timestamps? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929743#comment-15929743 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623827 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929739#comment-15929739 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106624514 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929736#comment-15929736 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106626434 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { --- End diff -- Can you also add a few unit tests to `org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala` to verify that the query is correctly translated? Thanks! > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929744#comment-15929744 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106617971 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,14 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) { --- End diff -- move this condition into the line above? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929734#comment-15929734 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106624582 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929735#comment-15929735 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623601 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer +
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929737#comment-15929737 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106625475 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(a) over (" + --- End diff -- Computing `avg`, `max`, `min` on the partition key is not very meaningful. Can you compute those on `b`? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106620612 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -159,6 +167,46 @@ class DataStreamOverAggregate( result } + def createUnboundedAndCurrentRowEventTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val keyedStream = inputDS.keyBy(partitionKeys: _*) +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( + namedAggregates, + inputType) + +keyedStream + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + // global non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( + namedAggregates, + inputType) + +inputDS.keyBy(new NullByteKeySelector[Row]) + .process(processFunction) + .setParallelism(1) --- End diff -- also `setMaxParallelism(1)` to prevent that this operator can be scaled out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106625858 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(a) over (" + --- End diff -- Also, most groups have just a single record. The max is two records. With that we cannot really check if the sorting works correctly. Can you make less groups (less distinct `a` values) and add more rows for some groups with out-of-order timestamps? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623827 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106622821 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106621564 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,35 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType): UnboundedEventTimeOverProcessFunction = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) + +val aggregationStateType: RowTypeInfo = --- End diff -- Also you can use `createAccumulatorRowType(inputType, aggregates)`. Btw. could you refactor the `createAccumulatorRowType(inputType, aggregates)` method and remove the `inputType` parameter? It is not used. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623601 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623012 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106617971 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,14 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) { --- End diff -- move this condition into the line above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106624514 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106626434 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { --- End diff -- Can you also add a few unit tests to `org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala` to verify that the query is correctly translated? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106624582 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106625475 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(a) over (" + --- End diff -- Computing `avg`, `max`, `min` on the partition key is not very meaningful. Can you compute those on `b`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106622479 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) --- End diff -- we should register the timer based on the record timestamp: `ctx.timerService.registerEventTimeTimer(ctx.timestamp + 1)` ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623704 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929731#comment-15929731 ] ASF GitHub Bot commented on FLINK-5991: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 +1 on not exposing the Java Serialisation shortcut, btw. I was very unhappy that we even have it for the normal operator state. > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 +1 on not exposing the Java Serialisation shortcut, btw. I was very unhappy that we even have it for the normal operator state. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 I would like us to take some time and think about the name. We are about to introduce a thing called "broadcast state" somewhat soon in the effort to make streaming joins possible. This broadcast state will provide an interface very similar to the current keyed state (we'll probably reuse the descriptors and state interfaces) but be checkpointed only on one operator because we only allow modifications based on broadcast input. I propose to rename the state we're talking about here to `UnionState` (or something) similar because what it does is take the snapshotted state from all operators and when restoring sends the union of that to all operators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929730#comment-15929730 ] ASF GitHub Bot commented on FLINK-5991: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 I would like us to take some time and think about the name. We are about to introduce a thing called "broadcast state" somewhat soon in the effort to make streaming joins possible. This broadcast state will provide an interface very similar to the current keyed state (we'll probably reuse the descriptors and state interfaces) but be checkpointed only on one operator because we only allow modifications based on broadcast input. I propose to rename the state we're talking about here to `UnionState` (or something) similar because what it does is take the snapshotted state from all operators and when restoring sends the union of that to all operators. > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6101) GroupBy fields with expression can not be selected either using original name or expression
lincoln.lee created FLINK-6101: -- Summary: GroupBy fields with expression can not be selected either using original name or expression Key: FLINK-6101 URL: https://issues.apache.org/jira/browse/FLINK-6101 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: lincoln.lee currently the TableAPI do not support selecting GroupBy fields with expression either using original field name or the expression ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) ``` cause ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b, 'c.min, 'e, 'a.avg, 'd.count) ``` and ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .groupBy('e, 'b % 3) .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) ``` will cause ``` org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. ``` and add an alias "group(e, 'b%3 as 'b)" still doesn't work ``` java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, b5, TMP_0, TMP_1, TMP_2] ``` the only way to get this work can be ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) .select('a, 'b%3 as 'b, 'c, 'd, 'e) .groupBy('e, 'b) .select('b, 'c.min, 'e, 'a.avg, 'd.count) ``` I'm confused, should we add support alias in groupBy clause? ( it seems a bit odd against SQL, but TableAPI has a different groupBy grammar ) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6058) Don't read DEFAULT_PARALLELISM from GlobalConfiguration
[ https://issues.apache.org/jira/browse/FLINK-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929719#comment-15929719 ] ASF GitHub Bot commented on FLINK-6058: --- GitHub user fanyon opened a pull request: https://github.com/apache/flink/pull/3561 [FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment fix read DEFAULT_PARALLELISM from ContextEnvironment You can merge this pull request into a Git repository by running: $ git pull https://github.com/fanyon/flink FLINK-6058 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3561.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3561 commit 446cec49e1aa75c4258c07994d0c5ae18ea0c128 Author: mengji.fyDate: 2017-03-17T10:33:34Z [FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment > Don't read DEFAULT_PARALLELISM from GlobalConfiguration > --- > > Key: FLINK-6058 > URL: https://issues.apache.org/jira/browse/FLINK-6058 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > In the constructor of {{StreamContextEnvironment}} we read the > {{DEFAULT_PARALLELISM}} from the {{GlobalConfiguration}}. This assumes that > the environment variables are correctly set and can lead to problems. We > should read the default parallelism in the client and set it in the > {{ContextEnvironment}} that it creates. This can then be read by the > {{StreamContextEnvironment}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3561: [FLINK-6058] fix read DEFAULT_PARALLELISM from Con...
GitHub user fanyon opened a pull request: https://github.com/apache/flink/pull/3561 [FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment fix read DEFAULT_PARALLELISM from ContextEnvironment You can merge this pull request into a Git repository by running: $ git pull https://github.com/fanyon/flink FLINK-6058 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3561.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3561 commit 446cec49e1aa75c4258c07994d0c5ae18ea0c128 Author: mengji.fyDate: 2017-03-17T10:33:34Z [FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---