[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708927#comment-16708927 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238713688 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -533,45 +618,49 @@ class MatchCodeGenerator( reusablePerRecordStatements += codeForAgg val defaultValue = primitiveDefaultValue(singleResultType) - val codeForSingleAgg = + val codeForSingleAgg = if (nullCheck) { j""" |boolean $singleResultNullTerm; - |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm - | .getField(${aggregates.size}); - |if ($singleResultTerm != null) { + |$primitiveSingleResultTypeTerm $singleResultTerm; + |if ($resultRowTerm.getField(${aggregates.size}) != null) { + | $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm + |.getField(${aggregates.size}); | $singleResultNullTerm = false; |} else { | $singleResultNullTerm = true; | $singleResultTerm = $defaultValue; |} |""".stripMargin + } else { +j""" + |boolean $singleResultNullTerm = false; Review comment: This line can be dropped. The null term is not evaluated if nullCheck is disabled. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708929#comment-16708929 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238706596 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -726,13 +727,14 @@ object AggregateUtil { tableConfig: TableConfig): MapPartitionFunction[Row, Row] = { val needRetract = false -val (aggFieldIndexes, aggregates, isDistinctAggs, accTypes, _) = transformToAggregateFunctions( +val aggregateMetadata = extractAggregateMetadata( namedAggregates.map(_.getKey), physicalInputRowType, needRetract, tableConfig) -val aggMapping = aggregates.indices.map(_ + groupings.length).toArray +val aggMapping = (0 until aggregateMetadata.getAggregateCallsCount).map(_ + groupings.length) Review comment: This occurs quite often (I counted 10 times). Maybe create helper function in `AggregateMetadata` that takes the grouping length such as `getAdjustedMapping()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708926#comment-16708926 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238711587 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -504,24 +588,25 @@ class MatchCodeGenerator( private val rowTypeTerm = "org.apache.flink.types.Row" -def getOrAddAggregation(call: RexCall): GeneratedExpression = { - reusableInputUnboxingExprs.get((call.toString, 0)) match { +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { case Some(expr) => expr case None => - val exp: GeneratedExpression = generateAggAccess(call) - aggregates += call - reusableInputUnboxingExprs((call.toString, 0)) = exp + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) Review comment: The naming is still not perfect here. What is the difference between `generateAggAccess` and `doGenerateAggAccess`? How about `generateDeduplicatedAggAccess` and `generateAggAccess`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708924#comment-16708924 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238718548 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -691,12 +795,14 @@ class MatchCodeGenerator( class PatternVariableFinder extends RexDefaultVisitor[Option[String]] { + val ALL_PATTERN_VARIABLE = "*" Review comment: We could even move this constant definition to `MatchUtil`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708928#comment-16708928 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238718086 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708925#comment-16708925 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238716344 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -582,22 +671,33 @@ class MatchCodeGenerator( val transformFuncName = s"transformRowForAgg_$variableUID" val inputTransform: String = generateAggInputExprEvaluation( -exprs, +matchAgg.inputExprs, transformFuncName) generateAggCalculation(aggFunc, transformFuncName, inputTransform) } +private case class LogicalMatchAggCall( Review comment: nit: move all classes to the bottom to logically separate them from methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708923#comment-16708923 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238709945 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -49,6 +50,90 @@ import scala.collection.mutable /** * A code generator for generating CEP related functions. * + * Aggregates are generated as follows: + * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable + * 2. even if the same aggregation is used multiple times in an expression + *(e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To do so [[AggBuilder]] + *keeps set of already seen different aggregation calls, and reuses the code to access + *appropriate field of aggregation result + * 3. after translating every expression (either in [[generateCondition]] or in + *[[generateOneRowPerMatchExpression]]) there will be generated code for + * * [[GeneratedFunction]], which will be an inner class + * * said [[GeneratedFunction]] will be instantiated in the ctor and opened/closed + * in corresponding methods of top level generated classes + * * function that transforms input rows (row by row) into aggregate input rows + * * function that calculates aggregates for variable, that uses the previous method + *The generated code will look similar to this: + * + * + * {{{ + * + * public class MatchRecognizePatternSelectFunction$175 extends RichPatternSelectFunction { + * + * // Class used to calculate aggregates for a single pattern variable + * public final class AggFunction_variable$115$151 extends GeneratedAggregations { + * ... + * } + * + * private final AggFunction_variable$115$151 aggregator_variable$115; + * + * public MatchRecognizePatternSelectFunction$175() { + * aggregator_variable$115 = new AggFunction_variable$115$151(); + * } + * + * public void open() { + * aggregator_variable$115.open(); + * ... + * } + * + * // Function to transform incoming row into aggregate specific row. It can e.g calculate + * // inner expression of said aggregate + * private Row transformRowForAgg_variable$115(Row inAgg) { + * ... + * } + * + * // Function to calculate all aggregates for a single pattern variable + * private Row calculateAgg_variable$115(List input) { + * Acc accumulator = aggregator_variable$115.createAccumulator(); + * for (Row row : input) { + * aggregator_variable$115.accumulate(accumulator, transformRowForAgg_variable$115(row)); + * } + * + * return aggregator_variable$115.getResult(accumulator); + * } + * + * @Override + * public Object select(Map> in1) throws Exception { + * + * // Extract list of rows assigned to a single pattern variable + * java.util.List patternEvents$130 = (java.util.List) in1.get("A"); + * ... + * + * // Calculate aggregates + * Row aggRow_variable$110$111 = calculateAgg_variable$110(patternEvents$114); + * + * // Every aggregation (e.g SUM(A.price) and AVG(A.price)) will be extracted to a variable + * double result$135 = aggRow_variable$126$127.getField(0); + * long result$137 = aggRow_variable$126$127.getField(1); + * + * // Result of aggregation will be used in expression evaluation + * out.setField(0, result$135) + * + * long result$140 = result$137 * 2; + * out.setField(1, result$140); + * + * double result$144 = $result135 + result$137; + * out.setField(2, result$144); + * } + * + * public void close() { + * aggregator_variable$115.close(); + * ... + * } + * + * } + * }}} + * Review comment: Awesome documentation. Very helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707016#comment-16707016 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on issue #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#issuecomment-443675670 Hi @dianfu, @twalthr . Thank you for your review. I've updated the PR according to your comments. There are still two main discussion points from @twalthr review: 1. Moving `PatternVariableFinder`, which also validates that there is only a single pattern variable in an aggregation, to e.g. `DataStreamMatchRule`. The main role of this class is to extract that single variable though. If I performed the validation at an earlier stage, I would have to traverse the expression with the same code in `MatchCodeGenerator` anyway, but I am ok with it, if you think it is worth it. 2. Using `aggCall.toString` as a key for reusing code for aggregation for a single variable that was used multiple times in an expression. I could not think of better way to identify that it is the same expression. Do you have an idea what could I use instead? I would appreciate if you had another look. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707001#comment-16707001 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238226741 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { Review comment: It actually generates only the access. It does not generate computing the aggregate. That name might not be perfect, but actually I would like to differentiate it somehow. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706998#comment-16706998 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238226363 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") Review comment: I think it makes sense here, as it is a term for a single aggregation from a set of all calculated for a pattern variable. It makes even more sense as I moved a variable that holds a term name for result containing all aggregates into this function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704253#comment-16704253 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237724165 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -42,6 +50,90 @@ import scala.collection.mutable /** * A code generator for generating CEP related functions. * + * Aggregates are generated as follows: + * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable + * 2. even if the same aggregation is used multiple times in an expression + *(e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To do so [[AggBuilder]] + *keeps set of already seen different aggregation calls, and reuses the code to access + *appropriate field of aggregation result + * 3. after translating every expression (either in [[generateCondition]] or in + *[[generateOneRowPerMatchExpression]]) there will be generated code for + * * [[GeneratedFunction]], which will be an inner class Review comment: code format This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704251#comment-16704251 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237731718 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val primitiveSingleResultTypeTerm = primitiveTypeTermForTypeInfo(singleResultType) + val boxedSingleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = if (nullCheck) { +j""" + |boolean $singleResultNullTerm; + |$primitiveSingleResultTypeTerm $singleResultTerm; + |if ($resultRowTerm.getField(${aggregates.size}) != null) { + | $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm + |.getField(${aggregates.size}); + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + } else { +j""" + |boolean $singleResultNullTerm = false; + |$primitiveSingleResultTypeTerm $singleResultTerm = + |($boxedSingleResultTypeTerm) $resultRowTerm.getField(${aggregates.size}); + |""".stripMargin + } + + reusablePerRecordStatements += codeForSingleAgg + + GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, singleResultType) +} + +def generateAggFunction() : Unit = { + val matchAgg = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +matchAgg.aggregations.map(_.aggFunction).toArray, +matchAgg.aggregations.map(_.inputIndices).toArray, +matchAgg.aggregations.indices.toArray, +Array.fill(matchAgg.aggregations.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +matchAgg.aggregations.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +matchAgg.inputExprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class LogicalMatchAggCall( + function: SqlAggFunction, + inputTypes: Seq[RelDataType], + exprIndices: Seq[Int] +) + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private case class MatchAgg( + aggregations: Seq[MatchAggCall], + inputExprs: Seq[RexNode] +) + +private def extractAggregatesAndExpressions: MatchAgg = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val logicalAggregates =
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704248#comment-16704248 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237739790 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val primitiveSingleResultTypeTerm = primitiveTypeTermForTypeInfo(singleResultType) + val boxedSingleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = if (nullCheck) { +j""" + |boolean $singleResultNullTerm; + |$primitiveSingleResultTypeTerm $singleResultTerm; + |if ($resultRowTerm.getField(${aggregates.size}) != null) { + | $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm + |.getField(${aggregates.size}); + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + } else { +j""" + |boolean $singleResultNullTerm = false; + |$primitiveSingleResultTypeTerm $singleResultTerm = + |($boxedSingleResultTypeTerm) $resultRowTerm.getField(${aggregates.size}); + |""".stripMargin + } + + reusablePerRecordStatements += codeForSingleAgg + + GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, singleResultType) +} + +def generateAggFunction() : Unit = { + val matchAgg = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +matchAgg.aggregations.map(_.aggFunction).toArray, +matchAgg.aggregations.map(_.inputIndices).toArray, +matchAgg.aggregations.indices.toArray, +Array.fill(matchAgg.aggregations.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +matchAgg.aggregations.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +matchAgg.inputExprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class LogicalMatchAggCall( + function: SqlAggFunction, + inputTypes: Seq[RelDataType], + exprIndices: Seq[Int] +) + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private case class MatchAgg( + aggregations: Seq[MatchAggCall], + inputExprs: Seq[RexNode] +) + +private def extractAggregatesAndExpressions: MatchAgg = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val logicalAggregates =
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704249#comment-16704249 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237737335 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") Review comment: What about removing the prefix `single`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704244#comment-16704244 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237737176 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val primitiveSingleResultTypeTerm = primitiveTypeTermForTypeInfo(singleResultType) + val boxedSingleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); Review comment: Define `calculateAgg_$variableUID` as a variable such as `val calculateAggFuncName = calculateAgg_$variableUID` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704254#comment-16704254 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237726170 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { Review comment: rename to aggCall This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704252#comment-16704252 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237472659 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { Review comment: What about renaming to generateAggregation? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704247#comment-16704247 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237472868 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { Review comment: getOrComputeAggregation? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704246#comment-16704246 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237727039 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val primitiveSingleResultTypeTerm = primitiveTypeTermForTypeInfo(singleResultType) + val boxedSingleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) Review comment: `patternName` doesn't reflects what this variable holds. What about rename `patternName` to `eventsExpr` or something else? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704245#comment-16704245 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237466962 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -71,12 +78,28 @@ class MatchCodeGenerator( private var offset: Int = 0 private var first : Boolean = false + /** +* Flags that tells if we generate expressions inside an aggregate. It tells how to access input Review comment: Flags -> Flag This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704250#comment-16704250 ] ASF GitHub Bot commented on FLINK-7599: --- dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237738914 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -442,4 +582,245 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) + aggregates += aggCall + reusableInputUnboxingExprs((aggCall.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def doGenerateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val primitiveSingleResultTypeTerm = primitiveTypeTermForTypeInfo(singleResultType) + val boxedSingleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = if (nullCheck) { +j""" + |boolean $singleResultNullTerm; + |$primitiveSingleResultTypeTerm $singleResultTerm; + |if ($resultRowTerm.getField(${aggregates.size}) != null) { + | $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm + |.getField(${aggregates.size}); + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + } else { +j""" + |boolean $singleResultNullTerm = false; + |$primitiveSingleResultTypeTerm $singleResultTerm = + |($boxedSingleResultTypeTerm) $resultRowTerm.getField(${aggregates.size}); + |""".stripMargin + } + + reusablePerRecordStatements += codeForSingleAgg + + GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, singleResultType) +} + +def generateAggFunction() : Unit = { Review comment: Remove the whitespace before the colon This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703484#comment-16703484 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237569734 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will +exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will Review comment: You are right, didn't think of UDAGs. Do you think it is worth adding such disclaimer at all. I've added it in the first place, because there was something similar in the SQL standard, but there are no UDAGs there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703385#comment-16703385 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237544327 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Sorry about this comment I totelly missed the following line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703386#comment-16703386 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237544327 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Sorry about this comment I totally missed the following line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703344#comment-16703344 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237529902 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703355#comment-16703355 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237533224 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703342#comment-16703342 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237529378 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703343#comment-16703343 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237529378 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703330#comment-16703330 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237526604 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Hmmm, no I am not. Have you maybe missed the `.getField()` call? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703327#comment-16703327 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237526604 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Hmmm, no I am not. Have you maybe missed the `.getField()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703202#comment-16703202 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237485540 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. Review comment: Rephrase: ``` Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703200#comment-16703200 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237484391 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions Review comment: Just call it `Aggregations`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703209#comment-16703209 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237494549 ## File path: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java ## @@ -0,0 +1,6337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.validate; + +import org.apache.calcite.config.NullCollation; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.linq4j.function.Function2; +import org.apache.calcite.linq4j.function.Functions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.type.DynamicRecordType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.runtime.CalciteContextException; +import org.apache.calcite.runtime.CalciteException; +import org.apache.calcite.runtime.Feature; +import org.apache.calcite.runtime.Resources; +import org.apache.calcite.schema.ColumnStrategy; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.ModifiableViewTable; +import org.apache.calcite.sql.JoinConditionType; +import org.apache.calcite.sql.JoinType; +import org.apache.calcite.sql.SqlAccessEnum; +import org.apache.calcite.sql.SqlAccessType; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlDelete; +import org.apache.calcite.sql.SqlDynamicParam; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlIntervalLiteral; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlMatchRecognize; +import org.apache.calcite.sql.SqlMerge; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSampleSpec; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlSelectKeyword; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.SqlUnresolvedFunction; +import org.apache.calcite.sql.SqlUpdate; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.SqlWindow; +import org.apache.calcite.sql.SqlWith; +import org.apache.calcite.sql.SqlWithItem; +import org.apache.calcite.sql.fun.SqlCase; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.AssignableOperandTypeChecker; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlOperandTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.calcite.sql.util.SqlShuttle; +import org.apache.calcite.sql.util.SqlVisitor; +import org.apache.calcite.sql2rel.InitializerContext; +import
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703205#comment-16703205 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237485908 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. Review comment: `to each subset` `In order to understand how those subsets are evaluated, ` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703199#comment-16703199 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237491268 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will Review comment: ``` The query will accumulate events as part of pattern variable `A` as long as the average price of those events does not exceed `15`. For example, such a limit exceeding happens at `01-Apr-11 10:00:04`. The following period exceeds the average price of `15` again at `01-Apr-11 10:00:10`. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703206#comment-16703206 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237491775 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will +exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will Review comment: ``` An important thing to have in mind is how aggregates behave in situation when no rows were mapped to a certain pattern variable. Every aggregate (except for `COUNT`) will produce `null` in those cases. `COUNT` on the other hand will produce 0. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703201#comment-16703201 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237493263 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will +exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will Review comment: But is this statement true? Doesn't the empty case depend on the aggregate function that you implemented? If you are using a UDF it must not be null right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703204#comment-16703204 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237486274 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will +exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will +produce 0. + +Attention `DISTINCT` aggregations are not supported. Moreover the `DISTINCT` modifier will be silently dropped if specified for aggregation! Review comment: Mention that you can not mix pattern variables. Mention that you can use expressions as parameters? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703203#comment-16703203 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237488275 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. Review comment: Introduce a subsection `Example` as done in other sections? Rephrase: `The task of the following example is to find... threshold. It shows how...` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703156#comment-16703156 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237476877 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) Review comment: You are creating a primitive value for a boxed type term? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703163#comment-16703163 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237479598 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Something is wrong here. You are casting a `Row` into the result type of the aggregation? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703170#comment-16703170 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237464842 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { Review comment: Can you add comments to this method or break the logic into more variables in the method. It is very difficult to understand the Scala magic here. You call the method `extractAggregatesAndExpressions` but what do the expression that come out of this method describe? Also you use too generic `aggs` and `agg` variables name with different data types and meaning. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key:
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703168#comment-16703168 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237442180 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, + |SUM(D.price) as sumD, + |weightedAvg(price, weight) as wAvg, + |AVG(B.price) AS avgB, + |SUM(B.price * B.rate) as sumExprB, + |LAST(id) as endId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ B+ C D? E ) + | DEFINE + |A AS SUM(A.price) < 6, + |B AS SUM(B.price * B.rate) < SUM(A.price) AND + | SUM(B.price * B.rate) > 0.2 AND + | SUM(B.price) >= 1 AND + | AVG(B.price) >= 1 AND + | weightedAvg(price, weight) > 1 + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8") Review comment: Produce a second row to be on the safe side? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703153#comment-16703153 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237436464 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703155#comment-16703155 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237434654 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { Review comment: nit: We are already in the `MatchRecognizeITCase` so we don't need to prefix every method with CEP. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703160#comment-16703160 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237437779 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala ## @@ -21,8 +21,8 @@ package org.apache.flink.table.`match` import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ Review comment: Can you rename this class into `MatchRecognizeValidationTest`? Otherwise it doesn't match our current naming schema. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703154#comment-16703154 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237439450 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala ## @@ -145,22 +139,21 @@ class MatchOperatorValidationTest extends TableTestBase { |MATCH_RECOGNIZE ( | ORDER BY proctime | MEASURES - |A.symbol AS aSymbol - | ALL ROWS PER MATCH + |SUM(A.price + B.tax) AS taxedPrice | PATTERN (A B) | DEFINE - |A AS symbol = 'a' + |A AS A.symbol = 'a' |) AS T |""".stripMargin streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] } @Test - def testGreedyQuantifierAtTheEndIsNotSupported(): Unit = { -thrown.expectMessage("Greedy quantifiers are not allowed as the last element of a " + - "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.") -thrown.expect(classOf[TableException]) + def testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs(): Unit = { +thrown.expect(classOf[ValidationException]) Review comment: Add part of the message here as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703164#comment-16703164 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237468069 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { Review comment: This looks hacky to me. Why do we add the string representation of a rex call to input expressions? It could lead to wrong results if the string representation looks the same. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703159#comment-16703159 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237439056 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala ## @@ -128,15 +128,9 @@ class MatchOperatorValidationTest extends TableTestBase { streamUtils.tableEnv.sqlQuery(sqlQuery).toRetractStream[Row] } - // *** - // * Those validations are temporary. We should remove those tests once we support those * - // * features. * - // *** - @Test - def testAllRowsPerMatch(): Unit = { -thrown.expectMessage("All rows per match mode is not supported yet.") -thrown.expect(classOf[TableException]) + def testAggregatesOnMultiplePatternVariablesNotSupported(): Unit = { +thrown.expect(classOf[ValidationException]) Review comment: add a part of the message as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703158#comment-16703158 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237450352 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, + |SUM(D.price) as sumD, + |weightedAvg(price, weight) as wAvg, + |AVG(B.price) AS avgB, + |SUM(B.price * B.rate) as sumExprB, + |LAST(id) as endId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ B+ C D? E ) + | DEFINE + |A AS SUM(A.price) < 6, + |B AS SUM(B.price * B.rate) < SUM(A.price) AND + | SUM(B.price * B.rate) > 0.2 AND + | SUM(B.price) >= 1 AND + | AVG(B.price) >= 1 AND + | weightedAvg(price, weight) > 1 + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testCepAggregatesWithNullInputs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[Row] +data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer)) +data.+=(Row.of(2:java.lang.Integer, "z", 10:java.lang.Integer)) +data.+=(Row.of(3:java.lang.Integer, "b", null)) +data.+=(Row.of(4:java.lang.Integer, "c", null)) +data.+=(Row.of(5:java.lang.Integer, "d", 3:java.lang.Integer)) +data.+=(Row.of(6:java.lang.Integer, "c", 3:java.lang.Integer)) +data.+=(Row.of(7:java.lang.Integer, "c", 3:java.lang.Integer)) +data.+=(Row.of(8:java.lang.Integer, "c", 3:java.lang.Integer)) +data.+=(Row.of(9:java.lang.Integer, "c", 2:java.lang.Integer)) + +val t = env.fromCollection(data)(new RowTypeInfo( Review comment: nit: use the `Types` class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > >
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703171#comment-16703171 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237450569 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, + |SUM(D.price) as sumD, + |weightedAvg(price, weight) as wAvg, + |AVG(B.price) AS avgB, + |SUM(B.price * B.rate) as sumExprB, + |LAST(id) as endId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ B+ C D? E ) + | DEFINE + |A AS SUM(A.price) < 6, + |B AS SUM(B.price * B.rate) < SUM(A.price) AND + | SUM(B.price * B.rate) > 0.2 AND + | SUM(B.price) >= 1 AND + | AVG(B.price) >= 1 AND + | weightedAvg(price, weight) > 1 + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testCepAggregatesWithNullInputs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[Row] +data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer)) Review comment: nit: hint you can also use `Int.box()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703152#comment-16703152 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237441497 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, Review comment: remove this when we don't support it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703161#comment-16703161 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237479817 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, Review comment: Unnecessary variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703169#comment-16703169 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237460915 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -72,12 +78,28 @@ class MatchCodeGenerator( private var offset: Int = 0 private var first : Boolean = false + /** +* Flags that tells if we generate expressions inside an aggregate. It tells how to access input +* row. +*/ + private var innerAggExpr: Boolean = false + + /** +* Name of term in function used to transform input row into aggregate input row. +*/ + private val inputAggRowTerm = newName("inAgg") Review comment: All final variable term in this class must not be created with `newName` they can be constant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703167#comment-16703167 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237465668 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703172#comment-16703172 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237470829 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { Review comment: Give the parameter of this method a better name. Because you assume an aggregate function call in this method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703157#comment-16703157 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237436990 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), +
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703166#comment-16703166 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237471737 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -304,10 +349,14 @@ class MatchCodeGenerator( } override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): GeneratedExpression = { -if (fieldRef.getAlpha.equals("*") && currentPattern.isDefined && offset == 0 && !first) { - generateInputAccess(input, input1Term, fieldRef.getIndex) +if (innerAggExpr) { Review comment: Call this variable `isWithinAggExprState` or similar? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703162#comment-16703162 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237474378 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -429,6 +464,24 @@ class MatchCodeGenerator( GeneratedExpression(rowNameTerm, isRowNull, funcCode, input) } + private def findEventsByPatternName( + patternFieldAlpha: String) +: GeneratedPatternList = { +reusablePatternLists.get(patternFieldAlpha) match { + // input access and unboxing has already been generated Review comment: this comment has nothing to do with input access This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703165#comment-16703165 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237466297 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { Review comment: Can you add a high level description how aggregates work in this code generator? Maybe here or even for the top-level class. Otherwise we don't remember it when we have to touch this class again. Maybe a little example code that explains which methods, classes, fields are generated and why? Not super detailed but at least to understand the concept. Actually this is also missing in the AggregationCodeGenerator. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702975#comment-16702975 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237422016 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, Review comment: Remove the default here and at other places. Implementers should explicitly state what they want. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702972#comment-16702972 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237422016 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, Review comment: Remove the default. Implementers should explicitly state what they want. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702971#comment-16702971 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237422674 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, + index: Int = 0) Review comment: Add more documentation to this function? What is `index` good for? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702967#comment-16702967 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237420010 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1185,331 +1273,281 @@ object AggregateUtil { val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size) val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: Any]](aggregateCalls.size) val accTypes = new Array[TypeInformation[_]](aggregateCalls.size) +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) +val isDistinctAggs = new Array[Boolean](aggregateCalls.size) -// create aggregate function instances by function type and aggregate field data type. Review comment: Restore this and the following line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702969#comment-16702969 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237424181 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, + index: Int = 0) +: (TableAggregateFunction[_, _], Review comment: Introduce a case class here as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702970#comment-16702970 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237417018 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1185,331 +1273,281 @@ object AggregateUtil { val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size) val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: Any]](aggregateCalls.size) val accTypes = new Array[TypeInformation[_]](aggregateCalls.size) +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) +val isDistinctAggs = new Array[Boolean](aggregateCalls.size) -// create aggregate function instances by function type and aggregate field data type. -aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) => - val argList: util.List[Integer] = aggregateCall.getArgList +aggregateCalls + .zipWithIndex.foreach { + case (aggregateCall, index) => +val argList: util.List[Integer] = aggregateCall.getArgList - if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { -aggregates(index) = new CountAggFunction if (argList.isEmpty) { - aggFieldIndexes(index) = Array[Int](-1) + if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { +aggFieldIndexes(index) = Array[Int](-1) + } else { +throw new TableException("Aggregate fields should not be empty.") + } } else { aggFieldIndexes(index) = argList.asScala.map(i => i.intValue).toArray } - } else { -if (argList.isEmpty) { - throw new TableException("Aggregate fields should not be empty.") + +val inputTypes = argList.map(aggregateInputType.getFieldList.get(_).getType) +val result = transformToAggregateFunction(aggregateCall.getAggregation, + aggregateCall.isDistinct, + inputTypes, + needRetraction, + tableConfig, + isStateBackedDataViews, + index) + +aggregates(index) = result._1 +accTypes(index) = result._2 +accSpecs(index) = result._3 +isDistinctAggs(index) = aggregateCall.isDistinct +} + +(aggFieldIndexes, aggregates, isDistinctAggs, accTypes, accSpecs) + } + + private def createFlinkAggFunction( Review comment: Can you add a comment to every method in this class? This class is quite big every comment helps in understanding what is going on. For example, `Converts Calcite's [[SqlAggFunction]] to a Flink UDF [[TableAggregationFunction]]`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702968#comment-16702968 ] ASF GitHub Bot commented on FLINK-7599: --- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237419538 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, + index: Int = 0) +: (TableAggregateFunction[_, _], + TypeInformation[_], + Seq[DataViewSpec[_]]) = { +// store the aggregate fields of each aggregate function, by the same order of aggregates. +// create aggregate function instances by function type and aggregate field data type. + +var accumulatorType: TypeInformation[_] = null +val inputType = () => if (aggregateInputType.isEmpty) { + throw new TableException("Aggregate fields should not be empty.") +} else { + aggregateInputType.get(0) +} + +val aggregate: TableAggregateFunction[_, _] = aggregateCall match { + + case _: SqlCountAggFunction => +new CountAggFunction + + case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT => +val agg = new CollectAggFunction(FlinkTypeFactory.toTypeInfo(inputType())) +accumulatorType = agg.getAccumulatorType +agg + + case udagg: AggSqlFunction => +accumulatorType = udagg.accType +udagg.getFunction + + case aggFunction: SqlAggFunction => +createFlinkAggFunction(aggFunction, + needRetraction, + inputType(), + tableConfig) +} + +val accSpecs = + if (accumulatorType != null) { +val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index, + aggregate, + accumulatorType, + isStateBackedDataViews) +if (specs.isDefined) { + accumulatorType = accType + specs.get +} else { + Seq() +} + } else { +accumulatorType = getAccumulatorTypeOfAggregateFunction(aggregate) +Seq() + } + +// create distinct accumulator filter argument +if (isDistinct) { + // Using Pojo fields for the real underlying accumulator + val pojoFields = new util.ArrayList[PojoField]() + pojoFields.add(new PojoField( +classOf[DistinctAccumulator[_]].getDeclaredField("realAcc"), +accumulatorType) + ) + // If StateBackend is not enabled, the distinct mapping also needs + // to be added to the Pojo fields. + if (!isStateBackedDataViews) { + +val argTypes: Array[TypeInformation[_]] = aggregateInputType + .map(FlinkTypeFactory.toTypeInfo).toArray + +val mapViewTypeInfo = new MapViewTypeInfo( + new RowTypeInfo(argTypes: _*), + BasicTypeInfo.LONG_TYPE_INFO) +pojoFields.add(new PojoField( + classOf[DistinctAccumulator[_]].getDeclaredField("distinctValueMap"), + mapViewTypeInfo) +) + } + accumulatorType = new PojoTypeInfo(classOf[DistinctAccumulator[_]], pojoFields) +} + +(aggregate, accumulatorType, accSpecs) + } + private def transformToAggregateFunctions( Review comment: We definitely need more documentation for this function. Can you create case classes for the return type of this function for proper naming because `Array[Int]` or `Array[Boolean]` is not very helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699166#comment-16699166 ] ASF GitHub Bot commented on FLINK-7599: --- dawidwys opened a new pull request #7177: [FLINK-7599] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177 ## What is the purpose of the change This change allows using aggregates in MEASURES and DEFINE clauses of MATCH_RECOGNIZE in sql. ## Brief change log - Split `AggregateUtil#transformToAggregateFunctions` - generating code for aggregates in `org.apache.flink.table.codegen.MatchCodeGenerator.AggBuilder` - extended verification of aggregates' operands in `org.apache.flink.table.codegen.PatternVariableFinder` that checks for operands in udags Still missing: - support for `open/close` methods (depends on #7110) - do not support distinct aggregates (depends on https://issues.apache.org/jira/browse/CALCITE-2707) - updated documentation ## Verifying this change Added tests: * `org.apache.flink.table.match.MatchOperatorValidationTest#testAggregatesOnMultiplePatternVariablesNotSupported` * `org.apache.flink.table.runtime.stream.sql.MatchRecognizeITCase#testCepAggregates` * `org.apache.flink.table.`match`.MatchOperatorValidationTest#testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692723#comment-16692723 ] Dian Fu commented on FLINK-7599: Sure, go ahead. > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691696#comment-16691696 ] Dawid Wysakowicz commented on FLINK-7599: - Hi [~dian.fu] would you be ok, if I take over this issue? > Support aggregation functions in the define and measures clause of > MatchRecognize > - > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)