[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985209#comment-15985209
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli closed the pull request at:
https://github.com/apache/flink/pull/3771
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985208#comment-15985208
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on the issue:
https://github.com/apache/flink/pull/3771
@fhueske I have created #3783 with just the code generation part. At least
the GROUP BY distinct can move ahead. I will close this PR and wait for the
merging of the Calcite fix.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985015#comment-15985015
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on the issue:
https://github.com/apache/flink/pull/3771
So, what do you want me to keep for this PR? just the code generation and
its test?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985003#comment-15985003
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3771
If we merge this change together with #3764, we would check it with tests
that use distinct grouped window aggregates.
But you are right, it might in fact make sense to test the
`GeneratedAggregations` class generated by the `CodeGenerator` individually in
a unit test. I wouldn't make this as part of this PR though.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984997#comment-15984997
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113483437
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -335,14 +371,28 @@ class CodeGenerator(
j"""
| public final void accumulate(
|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws
Exception""".stripMargin
val accumulate: String = {
for (i <- aggs.indices) yield
- j"""
+ if(distinctAggsFlags(i)){
+ j"""
+ | Long distValCount$i = (Long)
distStateList[$i].get(${parameters(i)});
+ | if( distValCount$i == null){
--- End diff --
Ah, that makes sense. Scala `Long` is backed by Java `long`, i.e., it does
not support `null`.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984939#comment-15984939
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113472166
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
---
@@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver(
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
-
+
+var initialized = false
+for(i <- distinctAggFlags.indices){
+ if(distinctAggFlags(i) && !initialized){
+function.initialize(getRuntimeContext())
--- End diff --
right!
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984933#comment-15984933
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113471605
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -335,14 +371,28 @@ class CodeGenerator(
j"""
| public final void accumulate(
|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws
Exception""".stripMargin
val accumulate: String = {
for (i <- aggs.indices) yield
- j"""
+ if(distinctAggsFlags(i)){
+ j"""
+ | Long distValCount$i = (Long)
distStateList[$i].get(${parameters(i)});
+ | if( distValCount$i == null){
--- End diff --
In scala it is 0L, is Java it is null. :-/
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984929#comment-15984929
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on the issue:
https://github.com/apache/flink/pull/3771
@fhueske @haohui I have no problem removing the DIST() part, it is just not
possible to test it without. Shall I push just the code generation and
aggregates util changes?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984926#comment-15984926
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink closed the pull request at:
https://github.com/apache/flink/pull/3732
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984937#comment-15984937
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113472029
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
---
@@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
import org.apache.flink.api.common.functions.Function
import org.apache.flink.types.Row
+import org.apache.flink.api.common.functions.RuntimeContext
/**
* Base class for code-generated aggregations.
*/
abstract class GeneratedAggregations extends Function {
+
+ /**
+* Initialize the state for the distinct aggregation check
+*
+* @param ctx the runtime context to retrieve and initialize the
distinct states
+*/
+ def initialize(ctx: RuntimeContext)
/**
-* Sets the results of the aggregations (partial or final) to the
output row.
--- End diff --
probably an error in the merging. sorry about that
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984931#comment-15984931
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113471045
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -296,6 +297,41 @@ class CodeGenerator(
fields.mkString(", ")
}
+def genInitialize(existDistinct : Boolean): String = {
+
+ val sig: String =
+j"""
+ | org.apache.flink.api.common.state.MapState[] distStateList =
+ | new org.apache.flink.api.common.state.MapState[
${distinctAggsFlags.size} ];
+ |
+ | public void initialize(
+ |org.apache.flink.api.common.functions.RuntimeContext ctx
+ | )""".stripMargin
+ if(existDistinct){
--- End diff --
you are right
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984794#comment-15984794
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3732
Since PR #3771 is the follow up of this PR, could you close this one? Thanks
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984783#comment-15984783
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113446823
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
---
@@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver(
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
-
+
+var initialized = false
+for(i <- distinctAggFlags.indices){
+ if(distinctAggFlags(i) && !initialized){
+function.initialize(getRuntimeContext())
--- End diff --
I would always call `function.initialize()`. If there are no distinct
aggregates, nothing will happen
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984782#comment-15984782
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113446166
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -290,6 +295,10 @@ object AggregateUtil {
val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
val outputArity = aggregates.length + groupings.length + 1
+// remove when distinct is supported
+val distinctAggregatesFlags = new Array[Boolean](aggregates.size)
--- End diff --
`DataSet` has it's own way to deal with distinct aggregates. They could
also not initialize the state. So it is save to call `generateAggregations()`
with an empty array: `Array[Boolean]()` or we make the parameter an
`Option[Array[Boolean]]` and pass `None`
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984788#comment-15984788
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113442315
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -296,6 +297,41 @@ class CodeGenerator(
fields.mkString(", ")
}
+def genInitialize(existDistinct : Boolean): String = {
+
+ val sig: String =
+j"""
+ | org.apache.flink.api.common.state.MapState[] distStateList =
--- End diff --
Reusable fields should be added with `reusableMemberStatements.add()`.
(Same should be done for the Iterables in `genMergeList()`)
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984787#comment-15984787
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113446347
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -290,6 +295,10 @@ object AggregateUtil {
val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
val outputArity = aggregates.length + groupings.length + 1
+// remove when distinct is supported
+val distinctAggregatesFlags = new Array[Boolean](aggregates.size)
--- End diff --
The same applies for all DataSet aggregations.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984789#comment-15984789
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113446851
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
---
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory
*/
class ProcTimeBoundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
+distinctAggFlags: Array[Boolean],
--- End diff --
remove this parameter.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984784#comment-15984784
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113446943
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
---
@@ -88,6 +88,12 @@ class BoundedProcessingOverRangeProcessFunctionTest {
|"mluZyRMb25nJOda0iCPo2ukAgAAeHA");
| }
|
+| public void initialize(
--- End diff --
can you make this a one-line change?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984786#comment-15984786
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113446584
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
---
@@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
import org.apache.flink.api.common.functions.Function
import org.apache.flink.types.Row
+import org.apache.flink.api.common.functions.RuntimeContext
/**
* Base class for code-generated aggregations.
*/
abstract class GeneratedAggregations extends Function {
+
+ /**
+* Initialize the state for the distinct aggregation check
+*
+* @param ctx the runtime context to retrieve and initialize the
distinct states
+*/
+ def initialize(ctx: RuntimeContext)
/**
-* Sets the results of the aggregations (partial or final) to the
output row.
--- End diff --
Revert this change?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984779#comment-15984779
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113442648
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -296,6 +297,41 @@ class CodeGenerator(
fields.mkString(", ")
}
+def genInitialize(existDistinct : Boolean): String = {
+
+ val sig: String =
+j"""
+ | org.apache.flink.api.common.state.MapState[] distStateList =
+ | new org.apache.flink.api.common.state.MapState[
${distinctAggsFlags.size} ];
+ |
+ | public void initialize(
+ |org.apache.flink.api.common.functions.RuntimeContext ctx
+ | )""".stripMargin
+ if(existDistinct){
+ val initDist: String = {
+ for(i <- distinctAggsFlags.indices) yield
+if( distinctAggsFlags(i)) {
+ j"""
+ |
+ |
org.apache.flink.api.common.state.MapStateDescriptor distDesc$i =
--- End diff --
We should use the right type of the field instead of `Object`. With
`Object`, we will use the GenericTypeSerializer instead of Flink's
type-specific serializers.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984781#comment-15984781
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113443910
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -335,14 +371,28 @@ class CodeGenerator(
j"""
| public final void accumulate(
|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws
Exception""".stripMargin
val accumulate: String = {
for (i <- aggs.indices) yield
- j"""
+ if(distinctAggsFlags(i)){
+ j"""
+ | Long distValCount$i = (Long)
distStateList[$i].get(${parameters(i)});
+ | if( distValCount$i == null){
--- End diff --
Didn't you say that the return value is never `null` but `0` if not set?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984777#comment-15984777
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113442944
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -335,14 +371,28 @@ class CodeGenerator(
j"""
| public final void accumulate(
|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws
Exception""".stripMargin
val accumulate: String = {
for (i <- aggs.indices) yield
- j"""
+ if(distinctAggsFlags(i)){
--- End diff --
+spaces `if (distinctAggsFlags(i)) {
`
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984785#comment-15984785
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3771#discussion_r113443466
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
@@ -296,6 +297,41 @@ class CodeGenerator(
fields.mkString(", ")
}
+def genInitialize(existDistinct : Boolean): String = {
+
+ val sig: String =
+j"""
+ | org.apache.flink.api.common.state.MapState[] distStateList =
+ | new org.apache.flink.api.common.state.MapState[
${distinctAggsFlags.size} ];
+ |
+ | public void initialize(
+ |org.apache.flink.api.common.functions.RuntimeContext ctx
+ | )""".stripMargin
+ if(existDistinct){
--- End diff --
I think this check and the `existDistinct` can be removed. If all fields
are `false`, we won't generate anything.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984727#comment-15984727
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3732
Thanks @stefanobortoli! I'll have a look at #3771
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984392#comment-15984392
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on the issue:
https://github.com/apache/flink/pull/3732
@fhueske @sunjincheng121 @shijinkui @hongyuhong I have created a PR with
the latest master with the code generated distinct, #3771 please have a look.
If we it is fine, we can basically support distinct for all the window types
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983254#comment-15983254
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on the issue:
https://github.com/apache/flink/pull/3771
@fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for
distinct in the code generator. Please have a look and let me know. I have
implemented and tested only for OverProcTimeRowBounded window, but if you like
it I can quickly implement and test also the others.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983252#comment-15983252
]
ASF GitHub Bot commented on FLINK-6250:
---
GitHub user stefanobortoli opened a pull request:
https://github.com/apache/flink/pull/3771
[FLINK-6250] Distinct procTime with Rows boundaries
Thanks for contributing to Apache Flink. Before you open your pull request,
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your
pull request. For more information and/or questions please refer to the [How To
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful
description of your changes.
- [X ] General
- The pull request references the related JIRA issue ("[FLINK-XXX] Jira
title text")
- The pull request addresses only one issue
- Each commit in the PR has a meaningful commit message (including the
JIRA id)
- [ X] Documentation
- Documentation has been added for new functionality
- Old documentation affected by the pull request has been updated
- JavaDoc for public methods has been added
- [ X] Tests & Build
- Functionality added by the pull request is covered by tests
- `mvn clean verify` has been executed successfully locally or a Travis
build has passed
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/stefanobortoli/flink FLINK-6250b
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3771.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3771
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982492#comment-15982492
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user rtudoran commented on the issue:
https://github.com/apache/flink/pull/3732
@fhueske @stefanobortoli I suggest we merge this temporary solution into
flink (with using a special marker for distinct) until the flink module will be
upgraded to the next calcite release. I have fixed the issue into calcite.
However, the advantages of pushing already this is that:
1) we can reuse the code
2) when we have the distinct marker we can simply modify the check for
distinct for the aggregates in the DataStreamOver
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978996#comment-15978996
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112725482
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.nio.charset.Charset
+import java.util.List
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes,
SqlTypeFamily, SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.LeafExpression
+import org.apache.calcite.sql.`type`.InferTypes
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.validate.SqlValidatorScope
+
+/**
+ * An SQL Function DISTINCT() used to mark the DISTINCT operator
+ * on aggregation input. This is temporary workaround waiting for
+ * https://issues.apache.org/jira/browse/CALCITE-1740 being solved
+ */
+object DistinctAggregatorExtractor extends SqlFunction("DIST",
SqlKind.OTHER_FUNCTION,
+ ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
+ OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
--- End diff --
one never ends to learn. :-)
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979007#comment-15979007
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on the issue:
https://github.com/apache/flink/pull/3732
@fhueske I have just pushed a version working with code generation (without
modifying the code generation) There will be the need for some refactoring in
the AggregateUtil function, but if the overall concept is sound, I will fix
things.
@hongyuhong , @shijinkui you could also have a look if you have time.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978999#comment-15978999
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112725820
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
---
@@ -91,6 +93,22 @@ class DataStreamOverAggregate(
val overWindow: org.apache.calcite.rel.core.Window.Group =
logicWindow.groups.get(0)
+val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
--- End diff --
This is a good point. The string trick is anyway a temporary workaround.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978943#comment-15978943
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3732
Btw, the code generation is not so fancy. The best way to learn it would be
to debug a simple batch GROUP BY query (once batch aggregations are code-gen'd)
as well
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978936#comment-15978936
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3732
sounds good to me. IMO, we can also add the runtime code to the code base
even if there is no API support if we cover it with test cases. Then we could
quickly enable it once DISTINCT in OVER becomes available.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978934#comment-15978934
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112716483
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978933#comment-15978933
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on the issue:
https://github.com/apache/flink/pull/3732
@fhueske, I agree with you about the risk of temporary DIST() ingestion.
Perhaps we could meanwhile just work on the "ProcessFunction + Code
generation" keeping the DIST function for test purposes tests. My concern is
that the code my change again and all the work would just be wasted. To be
honest, the code generation is quite new to me, and I will have to learn to
work on that. Meanwhile, I have almost completed a version that relies on
current code generation, nesting the distinct logic. As it is almost done, I
will share this one as well and then if necessary move to the code generation.
what do you think?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978917#comment-15978917
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112713096
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978916#comment-15978916
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user stefanobortoli commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112712717
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978895#comment-15978895
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112702342
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978891#comment-15978891
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112694911
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -152,7 +153,14 @@ object AggregateUtil {
val aggregationStateType: RowTypeInfo =
createAccumulatorRowType(aggregates)
val inputRowType =
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
-
+
+var hasDistinct = false
--- End diff --
can be simplified to
```
val hasDistinct = distinctAggregatesFlags.exists(_)
```
`exists` returns `true` if the function (here `_`) returns `true` at least
once.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978894#comment-15978894
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112695257
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -186,11 +194,21 @@ object AggregateUtil {
}
} else {
if (isRowsClause) {
-new ProcTimeBoundedRowsOver(
- genFunction,
- precedingOffset,
- aggregationStateType,
- inputRowType)
+if (hasDistinct){
+ new ProcTimeBoundedDistinctRowsOver(aggregates,
--- End diff --
`aggregates` in new line
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978902#comment-15978902
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112708098
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978898#comment-15978898
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112706580
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978904#comment-15978904
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112691071
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.nio.charset.Charset
+import java.util.List
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes,
SqlTypeFamily, SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.LeafExpression
+import org.apache.calcite.sql.`type`.InferTypes
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.validate.SqlValidatorScope
+
+/**
+ * An SQL Function DISTINCT() used to mark the DISTINCT operator
+ * on aggregation input. This is temporary workaround waiting for
+ * https://issues.apache.org/jira/browse/CALCITE-1740 being solved
+ */
+object DistinctAggregatorExtractor extends SqlFunction("DIST",
SqlKind.OTHER_FUNCTION,
+ ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
+ OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
--- End diff --
An aggregation function can also return non-numeric types such as
MIN(String). So `SqlFunctionCategory.NUMERIC` might not be the right category.
OTOH, I don't know how Calcite uses this category.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978890#comment-15978890
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112690943
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.nio.charset.Charset
+import java.util.List
+
+import org.apache.calcite.rel.`type`._
--- End diff --
Many unused imports
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978903#comment-15978903
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112693953
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
---
@@ -91,6 +93,22 @@ class DataStreamOverAggregate(
val overWindow: org.apache.calcite.rel.core.Window.Group =
logicWindow.groups.get(0)
+val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
+if (input.isInstanceOf[DataStreamCalc]) {
+ val dsCalc = input.asInstanceOf[DataStreamCalc]
+ val iter = dsCalc
+ .selectionToString(dsCalc.getProgram,
dsCalc.getExpressionString)
+ .split(",")
+ .iterator
+ while (iter.hasNext) {
+val exp = iter.next
+if(exp.contains("DIST")){
+ val varName = exp.substring(exp.indexOf("$"))
+ distinctVarMap.put(varName,true)
--- End diff --
+space
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978901#comment-15978901
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112694144
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
---
@@ -224,13 +244,21 @@ class DataStreamOverAggregate(
def createBoundedAndCurrentRowOverWindow(
generator: CodeGenerator,
inputDS: DataStream[Row],
+distinctVarMap : Map[String,Boolean],
isRowTimeType: Boolean,
isRowsClause: Boolean): DataStream[Row] = {
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] =
generateNamedAggregates
+val aggregateCalls = overWindow.getAggregateCalls(logicWindow)
+val distinctAggFlags: Array[Boolean] = new
Array[Boolean](aggregateCalls.size)
+for (i <- 0 until aggregateCalls.size()){
--- End diff --
+space `.size()) {`
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978893#comment-15978893
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112700998
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978900#comment-15978900
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112702163
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978899#comment-15978899
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112701470
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978892#comment-15978892
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112693618
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
---
@@ -91,6 +93,22 @@ class DataStreamOverAggregate(
val overWindow: org.apache.calcite.rel.core.Window.Group =
logicWindow.groups.get(0)
+val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
--- End diff --
I would do the extraction in the DataStreamOverAggregateRule. There we have
proper access to the input `Calc` and the `RexProgram`. Extraction the function
call as a String is quite fragile. The calc could for instance contain an
attribute called "DISTRIBUTION".
The rule would remove unnest the expression from the `DIST()` RexNode and
remove `DIST`. The distinct information would need to be added to the
DataStreamOverAggregate.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978896#comment-15978896
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112706609
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978897#comment-15978897
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112702313
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState",
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974731#comment-15974731
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112210389
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974726#comment-15974726
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112210097
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974485#comment-15974485
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112180091
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974357#comment-15974357
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112151646
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974352#comment-15974352
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user huawei-flink commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112151222
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973954#comment-15973954
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user hongyuhong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112105763
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
---
@@ -1140,6 +1140,202 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testNonPartitionedProcTimeOverDistinctWindow(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a,
'b, 'c, 'd, 'e)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+ " SUM(DIST(e)) OVER (" +
+ " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS
sumE " +
--- End diff --
I think we can add some test for multi aggregation with distinct.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973955#comment-15973955
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user hongyuhong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112103318
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973956#comment-15973956
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user hongyuhong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112105507
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973953#comment-15973953
]
ASF GitHub Bot commented on FLINK-6250:
---
Github user hongyuhong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3732#discussion_r112105029
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val distinctAggsFlag: Array[Boolean],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkNotNull(distinctAggsFlag)
+ Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+ private var distinctValueState: MapState[Any, Row] = _
+
+ override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState =
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState =
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972582#comment-15972582
]
ASF GitHub Bot commented on FLINK-6250:
---
GitHub user huawei-flink opened a pull request:
https://github.com/apache/flink/pull/3732
[FLINK-6250] Distinct procTime with Rows boundaries
Thanks for contributing to Apache Flink. Before you open your pull request,
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your
pull request. For more information and/or questions please refer to the [How To
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful
description of your changes.
- [X] General
- The pull request references the related JIRA issue ("[FLINK-XXX] Jira
title text")
- The pull request addresses only one issue
- Each commit in the PR has a meaningful commit message (including the
JIRA id)
- [X] Documentation
- Documentation has been added for new functionality
- Old documentation affected by the pull request has been updated
- JavaDoc for public methods has been added
- [X] Tests & Build
- Functionality added by the pull request is covered by tests
- `mvn clean verify` has been executed successfully locally or a Travis
build has passed
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/huawei-flink/flink FLINK-6250
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3732.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3732
commit 4e3da4c9baebf48bfc47ef192287e7e17ab69efd
Author: Stefano Bortoli
Date: 2017-04-18T12:27:26Z
DIST() for aggregation on procTime row bounded windows
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972574#comment-15972574
]
Stefano Bortoli commented on FLINK-6250:
Hi [~fhueske],
we are working with Calcite to bring in the support to DISTINCT. I think that
[~rtudoran] implemented a first version and we are under core review. Still,
the time to get that support through Calcite is far away (release, + inclusion
in Flink).
I agree with you that it is not optimal, but at least we are moving ahead. I
will push the branch without the code generation. Then you can have a look at
the implementation. We keep it for when DISTINCT is officially supported then.
Code generation aggregation interfaces do no include specific parameters for
distinct aggregation and retraction, thus those will have to be reworked
anyway. Should I open a JIRA for those?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972486#comment-15972486
]
Fabian Hueske commented on FLINK-6250:
--
Sorry for the inconvenience [~stefano.bortoli]. Code generation is required to
bring the interface of the user-defined aggregates in the final shape for the
1.3 release (e.g., to allow multiple input values).
I'm not sure if we should work around the missing support for `DISTINCT`
aggregates in Calcite by adding a non-standard compliant behavior that would
need to be removed afterwards (breaking existing queries that use the feature).
Calling a method `DIST()` on an aggregation function usually means to compute
the aggregate and apply the function on its result.
Did you check at which level, Calcite is missing support for DISTINCT? Is it
already in the parser or later in the validator?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972387#comment-15972387
]
Stefano Bortoli commented on FLINK-6250:
[~fhueske], I just saw that you just merged FLINK-6240... I was about to push
the FLINK-6250 branch, but the function was now replaced with code generation.
This makes the adding of DISTINCT function further more complicated. Was is it
really necessary while are still developing the functions?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967477#comment-15967477
]
Stefano Bortoli commented on FLINK-6250:
Ok, as a workaround for the missing distinct aggregation, I have implemented a
new {{SqlFunction}} called {{DIST()}} that is intercepted in the
{{DataStreamOverAggregate}} and used to create the {{distinctFlag}} for the
aggregators. That part will be removed as soon as calcite supports DISTINCT
aggregations. I think that eventually this task should be subject of a JIRA
issue by itself.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964250#comment-15964250
]
Stefano Bortoli commented on FLINK-6250:
According to the JIRA issue open to Calcite, there is no clear plan to solve
the DISTINCT aggregation problem. You mentioned that we could work around it in
Flink and then remove the workaround when calcite supports it. Not sure where I
should start from. Is it something like for the LogicalAggregation?
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955200#comment-15955200
]
Stefano Bortoli commented on FLINK-6250:
I am also fine implementing it extending the existing function. Practically it
would be what would have happened, but in another file. Currently I am trying
to understand where the DISTINCT marker gets lost as it does not arrive to the
OVER aggregation function. Probably it is intercepted in early stages of the
query processing.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955117#comment-15955117
]
Fabian Hueske commented on FLINK-6250:
--
I'm not sure about implementing this as a separate {{ProcessFunction}}.
I think we should adapt the existing ones because we will need support for
distinct and not distinct aggregates in the same {{ProcessFunction}}.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
[
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955041#comment-15955041
]
Stefano Bortoli commented on FLINK-6250:
I will approach this implementing a processing function starting from the the
simple aggregation:
1 - add a "distinctValue" MapState counting aggregated unique values in the
window,
2 - aggregating when the value is previously unseen
3 - decreasing counter when the value goes out of boundaries
4 - retract aggregator & remove from state when the counter is set to zero.
> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)