[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r241787811 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,65 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Aggregations + +Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported. + +Aggregate functions are applied to each subset of rows mapped to a match. In order to understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +The task of the following example is to find the longest period of time for which the average price of a ticker did not go below certain threshold. It shows how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; Review comment: @dawidwys what is `MR` actually doing? Do we have to call `SELECT MR.start_tstamp`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r241741665 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -168,27 +170,23 @@ object AggregateUtil { generateRetraction: Boolean, consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = { -val (aggFields, aggregates, isDistinctAggs, accTypes, accSpecs) = - transformToAggregateFunctions( +val aggregateMetadata = extractAggregateMetadata( namedAggregates.map(_.getKey), inputRowType, consumeRetraction, tableConfig, isStateBackedDataViews = true) -val aggMapping = aggregates.indices.map(_ + groupings.length).toArray - -val outputArity = groupings.length + aggregates.length - -val aggregationStateType: RowTypeInfo = new RowTypeInfo(accTypes: _*) +val aggMapping = getAdjustedMapping(aggregateMetadata.getAggregateCallsCount, groupings.length) Review comment: Actually I thought that this function can be part of `AggregateMetadata`? Because you always apply it on the calls count. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r241742837 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,65 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Aggregations + +Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported. + +Aggregate functions are applied to each subset of rows mapped to a match. In order to understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +The task of the following example is to find the longest period of time for which the average price of a ticker did not go below certain threshold. It shows how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of the pattern variable `A` as long as the average price of them does not exceed `15`. For example, such a limit exceeding happens at `01-Apr-11 10:00:04`. +The following period exceeds the average price of `15` again at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +Note Aggregations can be applied to expressions, but only if they reference a single pattern variable. Thus `SUM(A.price * A.tax)` is a valid one, but `AVG(A.price * B.tax)` is not. + +Attention `DISTINCT` aggregations are not supported. Review comment: Can you open a follow up issue for this if it does not exist yet? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238706596 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -726,13 +727,14 @@ object AggregateUtil { tableConfig: TableConfig): MapPartitionFunction[Row, Row] = { val needRetract = false -val (aggFieldIndexes, aggregates, isDistinctAggs, accTypes, _) = transformToAggregateFunctions( +val aggregateMetadata = extractAggregateMetadata( namedAggregates.map(_.getKey), physicalInputRowType, needRetract, tableConfig) -val aggMapping = aggregates.indices.map(_ + groupings.length).toArray +val aggMapping = (0 until aggregateMetadata.getAggregateCallsCount).map(_ + groupings.length) Review comment: This occurs quite often (I counted 10 times). Maybe create helper function in `AggregateMetadata` that takes the grouping length such as `getAdjustedMapping()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238718548 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -691,12 +795,14 @@ class MatchCodeGenerator( class PatternVariableFinder extends RexDefaultVisitor[Option[String]] { + val ALL_PATTERN_VARIABLE = "*" Review comment: We could even move this constant definition to `MatchUtil`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238718086 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), + needRetraction = false, + config, + isStateBackedDataViews = false, + index) + + MatchAggCall(result._1, agg._3, result._3) + } + + (aggs, inputRows.values.map(_._1).toSeq) +} + +
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238709945 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -49,6 +50,90 @@ import scala.collection.mutable /** * A code generator for generating CEP related functions. * + * Aggregates are generated as follows: + * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable + * 2. even if the same aggregation is used multiple times in an expression + *(e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To do so [[AggBuilder]] + *keeps set of already seen different aggregation calls, and reuses the code to access + *appropriate field of aggregation result + * 3. after translating every expression (either in [[generateCondition]] or in + *[[generateOneRowPerMatchExpression]]) there will be generated code for + * * [[GeneratedFunction]], which will be an inner class + * * said [[GeneratedFunction]] will be instantiated in the ctor and opened/closed + * in corresponding methods of top level generated classes + * * function that transforms input rows (row by row) into aggregate input rows + * * function that calculates aggregates for variable, that uses the previous method + *The generated code will look similar to this: + * + * + * {{{ + * + * public class MatchRecognizePatternSelectFunction$175 extends RichPatternSelectFunction { + * + * // Class used to calculate aggregates for a single pattern variable + * public final class AggFunction_variable$115$151 extends GeneratedAggregations { + * ... + * } + * + * private final AggFunction_variable$115$151 aggregator_variable$115; + * + * public MatchRecognizePatternSelectFunction$175() { + * aggregator_variable$115 = new AggFunction_variable$115$151(); + * } + * + * public void open() { + * aggregator_variable$115.open(); + * ... + * } + * + * // Function to transform incoming row into aggregate specific row. It can e.g calculate + * // inner expression of said aggregate + * private Row transformRowForAgg_variable$115(Row inAgg) { + * ... + * } + * + * // Function to calculate all aggregates for a single pattern variable + * private Row calculateAgg_variable$115(List input) { + * Acc accumulator = aggregator_variable$115.createAccumulator(); + * for (Row row : input) { + * aggregator_variable$115.accumulate(accumulator, transformRowForAgg_variable$115(row)); + * } + * + * return aggregator_variable$115.getResult(accumulator); + * } + * + * @Override + * public Object select(Map> in1) throws Exception { + * + * // Extract list of rows assigned to a single pattern variable + * java.util.List patternEvents$130 = (java.util.List) in1.get("A"); + * ... + * + * // Calculate aggregates + * Row aggRow_variable$110$111 = calculateAgg_variable$110(patternEvents$114); + * + * // Every aggregation (e.g SUM(A.price) and AVG(A.price)) will be extracted to a variable + * double result$135 = aggRow_variable$126$127.getField(0); + * long result$137 = aggRow_variable$126$127.getField(1); + * + * // Result of aggregation will be used in expression evaluation + * out.setField(0, result$135) + * + * long result$140 = result$137 * 2; + * out.setField(1, result$140); + * + * double result$144 = $result135 + result$137; + * out.setField(2, result$144); + * } + * + * public void close() { + * aggregator_variable$115.close(); + * ... + * } + * + * } + * }}} + * Review comment: Awesome documentation. Very helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238711587 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -504,24 +588,25 @@ class MatchCodeGenerator( private val rowTypeTerm = "org.apache.flink.types.Row" -def getOrAddAggregation(call: RexCall): GeneratedExpression = { - reusableInputUnboxingExprs.get((call.toString, 0)) match { +def generateAggAccess(aggCall: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((aggCall.toString, 0)) match { case Some(expr) => expr case None => - val exp: GeneratedExpression = generateAggAccess(call) - aggregates += call - reusableInputUnboxingExprs((call.toString, 0)) = exp + val exp: GeneratedExpression = doGenerateAggAccess(aggCall) Review comment: The naming is still not perfect here. What is the difference between `generateAggAccess` and `doGenerateAggAccess`? How about `generateDeduplicatedAggAccess` and `generateAggAccess`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238713688 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -533,45 +618,49 @@ class MatchCodeGenerator( reusablePerRecordStatements += codeForAgg val defaultValue = primitiveDefaultValue(singleResultType) - val codeForSingleAgg = + val codeForSingleAgg = if (nullCheck) { j""" |boolean $singleResultNullTerm; - |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm - | .getField(${aggregates.size}); - |if ($singleResultTerm != null) { + |$primitiveSingleResultTypeTerm $singleResultTerm; + |if ($resultRowTerm.getField(${aggregates.size}) != null) { + | $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm + |.getField(${aggregates.size}); | $singleResultNullTerm = false; |} else { | $singleResultNullTerm = true; | $singleResultTerm = $defaultValue; |} |""".stripMargin + } else { +j""" + |boolean $singleResultNullTerm = false; Review comment: This line can be dropped. The null term is not evaluated if nullCheck is disabled. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r238716344 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -582,22 +671,33 @@ class MatchCodeGenerator( val transformFuncName = s"transformRowForAgg_$variableUID" val inputTransform: String = generateAggInputExprEvaluation( -exprs, +matchAgg.inputExprs, transformFuncName) generateAggCalculation(aggFunc, transformFuncName, inputTransform) } +private case class LogicalMatchAggCall( Review comment: nit: move all classes to the bottom to logically separate them from methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237544327 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Sorry about this comment I totally missed the following line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237544327 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Sorry about this comment I totelly missed the following line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237494549 ## File path: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java ## @@ -0,0 +1,6337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.validate; + +import org.apache.calcite.config.NullCollation; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.linq4j.function.Function2; +import org.apache.calcite.linq4j.function.Functions; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.type.DynamicRecordType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.runtime.CalciteContextException; +import org.apache.calcite.runtime.CalciteException; +import org.apache.calcite.runtime.Feature; +import org.apache.calcite.runtime.Resources; +import org.apache.calcite.schema.ColumnStrategy; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.ModifiableViewTable; +import org.apache.calcite.sql.JoinConditionType; +import org.apache.calcite.sql.JoinType; +import org.apache.calcite.sql.SqlAccessEnum; +import org.apache.calcite.sql.SqlAccessType; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlDelete; +import org.apache.calcite.sql.SqlDynamicParam; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlIntervalLiteral; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlMatchRecognize; +import org.apache.calcite.sql.SqlMerge; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSampleSpec; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlSelectKeyword; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.SqlUnresolvedFunction; +import org.apache.calcite.sql.SqlUpdate; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.SqlWindow; +import org.apache.calcite.sql.SqlWith; +import org.apache.calcite.sql.SqlWithItem; +import org.apache.calcite.sql.fun.SqlCase; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.AssignableOperandTypeChecker; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlOperandTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.calcite.sql.util.SqlShuttle; +import org.apache.calcite.sql.util.SqlVisitor; +import org.apache.calcite.sql2rel.InitializerContext; +import org.apache.calcite.util.BitString; +import org.apache.calcite.util.Bug; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.Litmus;
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237491775 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will +exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will Review comment: ``` An important thing to have in mind is how aggregates behave in situation when no rows were mapped to a certain pattern variable. Every aggregate (except for `COUNT`) will produce `null` in those cases. `COUNT` on the other hand will produce 0. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237491268 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will Review comment: ``` The query will accumulate events as part of pattern variable `A` as long as the average price of those events does not exceed `15`. For example, such a limit exceeding happens at `01-Apr-11 10:00:04`. The following period exceeds the average price of `15` again at `01-Apr-11 10:00:10`. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237485908 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. Review comment: `to each subset` `In order to understand how those subsets are evaluated, ` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237493263 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will +exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will Review comment: But is this statement true? Doesn't the empty case depend on the aggregate function that you implemented? If you are using a UDF it must not be null right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237488275 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. Review comment: Introduce a subsection `Example` as done in other sections? Rephrase: `The task of the following example is to find... threshold. It shows how...` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237485540 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. Review comment: Rephrase: ``` Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237486274 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions + +One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions. + +Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will +exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will +produce 0. + +Attention `DISTINCT` aggregations are not supported. Moreover the `DISTINCT` modifier will be silently dropped if specified for aggregation! Review comment: Mention that you can not mix pattern variables. Mention that you can use expressions as parameters? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237484391 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Scalar & Aggregate functions Review comment: Just call it `Aggregations`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237436990 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), + needRetraction = false, + config, + isStateBackedDataViews = false, + index) + + MatchAggCall(result._1, agg._3, result._3) + } + + (aggs, inputRows.values.map(_._1).toSeq) +} + +
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237471737 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -304,10 +349,14 @@ class MatchCodeGenerator( } override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): GeneratedExpression = { -if (fieldRef.getAlpha.equals("*") && currentPattern.isDefined && offset == 0 && !first) { - generateInputAccess(input, input1Term, fieldRef.getIndex) +if (innerAggExpr) { Review comment: Call this variable `isWithinAggExprState` or similar? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237465668 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), + needRetraction = false, + config, + isStateBackedDataViews = false, + index) + + MatchAggCall(result._1, agg._3, result._3) + } + + (aggs, inputRows.values.map(_._1).toSeq) +} + +
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237464842 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { Review comment: Can you add comments to this method or break the logic into more variables in the method. It is very difficult to understand the Scala magic here. You call the method `extractAggregatesAndExpressions` but what do the expression that come out of this method describe? Also you use too generic `aggs` and `agg` variables name with different data types and meaning. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237437779 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala ## @@ -21,8 +21,8 @@ package org.apache.flink.table.`match` import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ Review comment: Can you rename this class into `MatchRecognizeValidationTest`? Otherwise it doesn't match our current naming schema. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237441497 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, Review comment: remove this when we don't support it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237466297 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { Review comment: Can you add a high level description how aggregates work in this code generator? Maybe here or even for the top-level class. Otherwise we don't remember it when we have to touch this class again. Maybe a little example code that explains which methods, classes, fields are generated and why? Not super detailed but at least to understand the concept. Actually this is also missing in the AggregationCodeGenerator. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237479817 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, Review comment: Unnecessary variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237476877 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) Review comment: You are creating a primitive value for a boxed type term? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237436464 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm + | .getField(${aggregates.size}); + |if ($singleResultTerm != null) { + | $singleResultNullTerm = false; + |} else { + | $singleResultNullTerm = true; + | $singleResultTerm = $defaultValue; + |} + |""".stripMargin + + reusablePerRecordStatements += codeForSingleAgg + + val exp = GeneratedExpression(singleResultTerm, +singleResultNullTerm, +NO_CODE, +singleResultType) + exp +} + +def generateAggFunction() : Unit = { + val (aggs, exprs) = extractAggregatesAndExpressions + + val aggGenerator = new AggregationCodeGenerator(config, false, input, None) + + val aggFunc = aggGenerator.generateAggregations( +s"AggFunction_$variableUID", +exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)), +aggs.map(_.aggFunction).toArray, +aggs.map(_.inputIndices).toArray, +aggs.indices.toArray, +Array.fill(aggs.size)(false), +isStateBackedDataViews = false, +partialResults = false, +Array.emptyIntArray, +None, +aggs.size, +needRetract = false, +needMerge = false, +needReset = false, +None + ) + + reusableMemberStatements.add(aggFunc.code) + + val transformFuncName = s"transformRowForAgg_$variableUID" + val inputTransform: String = generateAggInputExprEvaluation( +exprs, +transformFuncName) + + generateAggCalculation(aggFunc, transformFuncName, inputTransform) +} + +private case class MatchAggCall( + aggFunction: TableAggregateFunction[_, _], + inputIndices: Array[Int], + dataViews: Seq[DataViewSpec[_]] +) + +private def extractAggregatesAndExpressions: (Seq[MatchAggCall], Seq[RexNode]) = { + val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)] + + val aggs = aggregates.map(rexAggCall => { +val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => { + inputRows.get(innerCall.toString) match { +case Some(x) => + x + +case None => + val callWithIndex = (innerCall, inputRows.size) + inputRows(innerCall.toString) = callWithIndex + callWithIndex + } +}).toList + +val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction] +(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray) + }).zipWithIndex.map { +case (agg, index) => + val result = AggregateUtil +.transformToAggregateFunction(agg._1, + isDistinct = false, + agg._2.map(_.getType), + needRetraction = false, + config, + isStateBackedDataViews = false, + index) + + MatchAggCall(result._1, agg._3, result._3) + } + + (aggs, inputRows.values.map(_._1).toSeq) +} + +
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237450352 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, + |SUM(D.price) as sumD, + |weightedAvg(price, weight) as wAvg, + |AVG(B.price) AS avgB, + |SUM(B.price * B.rate) as sumExprB, + |LAST(id) as endId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ B+ C D? E ) + | DEFINE + |A AS SUM(A.price) < 6, + |B AS SUM(B.price * B.rate) < SUM(A.price) AND + | SUM(B.price * B.rate) > 0.2 AND + | SUM(B.price) >= 1 AND + | AVG(B.price) >= 1 AND + | weightedAvg(price, weight) > 1 + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testCepAggregatesWithNullInputs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[Row] +data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer)) +data.+=(Row.of(2:java.lang.Integer, "z", 10:java.lang.Integer)) +data.+=(Row.of(3:java.lang.Integer, "b", null)) +data.+=(Row.of(4:java.lang.Integer, "c", null)) +data.+=(Row.of(5:java.lang.Integer, "d", 3:java.lang.Integer)) +data.+=(Row.of(6:java.lang.Integer, "c", 3:java.lang.Integer)) +data.+=(Row.of(7:java.lang.Integer, "c", 3:java.lang.Integer)) +data.+=(Row.of(8:java.lang.Integer, "c", 3:java.lang.Integer)) +data.+=(Row.of(9:java.lang.Integer, "c", 2:java.lang.Integer)) + +val t = env.fromCollection(data)(new RowTypeInfo( Review comment: nit: use the `Types` class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237450569 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, + |SUM(D.price) as sumD, + |weightedAvg(price, weight) as wAvg, + |AVG(B.price) AS avgB, + |SUM(B.price * B.rate) as sumExprB, + |LAST(id) as endId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ B+ C D? E ) + | DEFINE + |A AS SUM(A.price) < 6, + |B AS SUM(B.price * B.rate) < SUM(A.price) AND + | SUM(B.price * B.rate) > 0.2 AND + | SUM(B.price) >= 1 AND + | AVG(B.price) >= 1 AND + | weightedAvg(price, weight) > 1 + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testCepAggregatesWithNullInputs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[Row] +data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer)) Review comment: nit: hint you can also use `Int.box()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237470829 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { Review comment: Give the parameter of this method a better name. Because you assume an aggregate function call in this method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237439056 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala ## @@ -128,15 +128,9 @@ class MatchOperatorValidationTest extends TableTestBase { streamUtils.tableEnv.sqlQuery(sqlQuery).toRetractStream[Row] } - // *** - // * Those validations are temporary. We should remove those tests once we support those * - // * features. * - // *** - @Test - def testAllRowsPerMatch(): Unit = { -thrown.expectMessage("All rows per match mode is not supported yet.") -thrown.expect(classOf[TableException]) + def testAggregatesOnMultiplePatternVariablesNotSupported(): Unit = { +thrown.expect(classOf[ValidationException]) Review comment: add a part of the message as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237474378 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -429,6 +464,24 @@ class MatchCodeGenerator( GeneratedExpression(rowNameTerm, isRowNull, funcCode, input) } + private def findEventsByPatternName( + patternFieldAlpha: String) +: GeneratedPatternList = { +reusablePatternLists.get(patternFieldAlpha) match { + // input access and unboxing has already been generated Review comment: this comment has nothing to do with input access This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237439450 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala ## @@ -145,22 +139,21 @@ class MatchOperatorValidationTest extends TableTestBase { |MATCH_RECOGNIZE ( | ORDER BY proctime | MEASURES - |A.symbol AS aSymbol - | ALL ROWS PER MATCH + |SUM(A.price + B.tax) AS taxedPrice | PATTERN (A B) | DEFINE - |A AS symbol = 'a' + |A AS A.symbol = 'a' |) AS T |""".stripMargin streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] } @Test - def testGreedyQuantifierAtTheEndIsNotSupported(): Unit = { -thrown.expectMessage("Greedy quantifiers are not allowed as the last element of a " + - "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.") -thrown.expect(classOf[TableException]) + def testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs(): Unit = { +thrown.expect(classOf[ValidationException]) Review comment: Add part of the message here as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237468069 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { Review comment: This looks hacky to me. Why do we add the string representation of a rex call to input expressions? It could lead to wrong results if the string representation looks the same. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237479598 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -445,4 +498,223 @@ class MatchCodeGenerator( generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex) } + + class AggBuilder(variable: String) { + +private val aggregates = new mutable.ListBuffer[RexCall]() + +private val variableUID = newName("variable") + +private val resultRowTerm = newName(s"aggRow_$variableUID") + +private val rowTypeTerm = "org.apache.flink.types.Row" + +def getOrAddAggregation(call: RexCall): GeneratedExpression = { + reusableInputUnboxingExprs.get((call.toString, 0)) match { +case Some(expr) => + expr + +case None => + val exp: GeneratedExpression = generateAggAccess(call) + aggregates += call + reusableInputUnboxingExprs((call.toString, 0)) = exp + exp.copy(code = NO_CODE) + } +} + +private def generateAggAccess(call: RexCall) = { + val singleResultTerm = newName("result") + val singleResultNullTerm = newName("nullTerm") + val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`) + val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType) + + val patternName = findEventsByPatternName(variable) + + val codeForAgg = +j""" + |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm}); + |""".stripMargin + + reusablePerRecordStatements += codeForAgg + + val defaultValue = primitiveDefaultValue(singleResultType) + val codeForSingleAgg = +j""" + |boolean $singleResultNullTerm; + |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) $resultRowTerm Review comment: Something is wrong here. You are casting a `Row` into the result type of the aggregation? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237442180 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long, Double, Int)] +data.+=((1, "a", 1, 0.8, 1)) +data.+=((2, "z", 2, 0.8, 3)) +data.+=((3, "b", 1, 0.8, 2)) +data.+=((4, "c", 1, 0.8, 5)) +data.+=((5, "d", 4, 0.1, 5)) +data.+=((6, "a", 2, 1.5, 2)) +data.+=((7, "b", 2, 0.8, 3)) +data.+=((8, "c", 1, 0.8, 2)) +data.+=((9, "h", 2, 0.8, 3)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("weightedAvg", new WeightedAvg) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as startId, + |SUM(A.price) AS sumA, + |COUNT(DISTINCT D.price) AS countD, + |SUM(D.price) as sumD, + |weightedAvg(price, weight) as wAvg, + |AVG(B.price) AS avgB, + |SUM(B.price * B.rate) as sumExprB, + |LAST(id) as endId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ B+ C D? E ) + | DEFINE + |A AS SUM(A.price) < 6, + |B AS SUM(B.price * B.rate) < SUM(A.price) AND + | SUM(B.price * B.rate) > 0.2 AND + | SUM(B.price) >= 1 AND + | AVG(B.price) >= 1 AND + | weightedAvg(price, weight) > 1 + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8") Review comment: Produce a second row to be on the safe side? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237460915 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -72,12 +78,28 @@ class MatchCodeGenerator( private var offset: Int = 0 private var first : Boolean = false + /** +* Flags that tells if we generate expressions inside an aggregate. It tells how to access input +* row. +*/ + private var innerAggExpr: Boolean = false + + /** +* Name of term in function used to transform input row into aggregate input row. +*/ + private val inputAggRowTerm = newName("inAgg") Review comment: All final variable term in this class must not be created with `newName` they can be constant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237434654 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* This query checks: +* +* 1. count(D.price) produces 0, because no rows matched to D +* 2. sum(D.price) produces null, because no rows matched to D +* 3. aggregates that take multiple parameters work +* 4. aggregates with expressions work +*/ + @Test + def testCepAggregates(): Unit = { Review comment: nit: We are already in the `MatchRecognizeITCase` so we don't need to prefix every method with CEP. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237422016 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, Review comment: Remove the default here and at other places. Implementers should explicitly state what they want. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237419538 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, + index: Int = 0) +: (TableAggregateFunction[_, _], + TypeInformation[_], + Seq[DataViewSpec[_]]) = { +// store the aggregate fields of each aggregate function, by the same order of aggregates. +// create aggregate function instances by function type and aggregate field data type. + +var accumulatorType: TypeInformation[_] = null +val inputType = () => if (aggregateInputType.isEmpty) { + throw new TableException("Aggregate fields should not be empty.") +} else { + aggregateInputType.get(0) +} + +val aggregate: TableAggregateFunction[_, _] = aggregateCall match { + + case _: SqlCountAggFunction => +new CountAggFunction + + case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT => +val agg = new CollectAggFunction(FlinkTypeFactory.toTypeInfo(inputType())) +accumulatorType = agg.getAccumulatorType +agg + + case udagg: AggSqlFunction => +accumulatorType = udagg.accType +udagg.getFunction + + case aggFunction: SqlAggFunction => +createFlinkAggFunction(aggFunction, + needRetraction, + inputType(), + tableConfig) +} + +val accSpecs = + if (accumulatorType != null) { +val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index, + aggregate, + accumulatorType, + isStateBackedDataViews) +if (specs.isDefined) { + accumulatorType = accType + specs.get +} else { + Seq() +} + } else { +accumulatorType = getAccumulatorTypeOfAggregateFunction(aggregate) +Seq() + } + +// create distinct accumulator filter argument +if (isDistinct) { + // Using Pojo fields for the real underlying accumulator + val pojoFields = new util.ArrayList[PojoField]() + pojoFields.add(new PojoField( +classOf[DistinctAccumulator[_]].getDeclaredField("realAcc"), +accumulatorType) + ) + // If StateBackend is not enabled, the distinct mapping also needs + // to be added to the Pojo fields. + if (!isStateBackedDataViews) { + +val argTypes: Array[TypeInformation[_]] = aggregateInputType + .map(FlinkTypeFactory.toTypeInfo).toArray + +val mapViewTypeInfo = new MapViewTypeInfo( + new RowTypeInfo(argTypes: _*), + BasicTypeInfo.LONG_TYPE_INFO) +pojoFields.add(new PojoField( + classOf[DistinctAccumulator[_]].getDeclaredField("distinctValueMap"), + mapViewTypeInfo) +) + } + accumulatorType = new PojoTypeInfo(classOf[DistinctAccumulator[_]], pojoFields) +} + +(aggregate, accumulatorType, accSpecs) + } + private def transformToAggregateFunctions( Review comment: We definitely need more documentation for this function. Can you create case classes for the return type of this function for proper naming because `Array[Int]` or `Array[Boolean]` is not very helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237422016 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, Review comment: Remove the default. Implementers should explicitly state what they want. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237422674 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, + index: Int = 0) Review comment: Add more documentation to this function? What is `index` good for? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237420010 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1185,331 +1273,281 @@ object AggregateUtil { val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size) val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: Any]](aggregateCalls.size) val accTypes = new Array[TypeInformation[_]](aggregateCalls.size) +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) +val isDistinctAggs = new Array[Boolean](aggregateCalls.size) -// create aggregate function instances by function type and aggregate field data type. Review comment: Restore this and the following line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237424181 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1169,6 +1169,94 @@ object AggregateUtil { (propPos._1, propPos._2, propPos._3) } + private[flink] def transformToAggregateFunction( + aggregateCall: SqlAggFunction, + isDistinct: Boolean, + aggregateInputType: Seq[RelDataType], + needRetraction: Boolean, + tableConfig: TableConfig, + isStateBackedDataViews: Boolean = false, + index: Int = 0) +: (TableAggregateFunction[_, _], Review comment: Introduce a case class here as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237417018 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1185,331 +1273,281 @@ object AggregateUtil { val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size) val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: Any]](aggregateCalls.size) val accTypes = new Array[TypeInformation[_]](aggregateCalls.size) +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) +val isDistinctAggs = new Array[Boolean](aggregateCalls.size) -// create aggregate function instances by function type and aggregate field data type. -aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) => - val argList: util.List[Integer] = aggregateCall.getArgList +aggregateCalls + .zipWithIndex.foreach { + case (aggregateCall, index) => +val argList: util.List[Integer] = aggregateCall.getArgList - if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { -aggregates(index) = new CountAggFunction if (argList.isEmpty) { - aggFieldIndexes(index) = Array[Int](-1) + if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { +aggFieldIndexes(index) = Array[Int](-1) + } else { +throw new TableException("Aggregate fields should not be empty.") + } } else { aggFieldIndexes(index) = argList.asScala.map(i => i.intValue).toArray } - } else { -if (argList.isEmpty) { - throw new TableException("Aggregate fields should not be empty.") + +val inputTypes = argList.map(aggregateInputType.getFieldList.get(_).getType) +val result = transformToAggregateFunction(aggregateCall.getAggregation, + aggregateCall.isDistinct, + inputTypes, + needRetraction, + tableConfig, + isStateBackedDataViews, + index) + +aggregates(index) = result._1 +accTypes(index) = result._2 +accSpecs(index) = result._3 +isDistinctAggs(index) = aggregateCall.isDistinct +} + +(aggFieldIndexes, aggregates, isDistinctAggs, accTypes, accSpecs) + } + + private def createFlinkAggFunction( Review comment: Can you add a comment to every method in this class? This class is quite big every comment helps in understanding what is going on. For example, `Converts Calcite's [[SqlAggFunction]] to a Flink UDF [[TableAggregationFunction]]`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services