[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708927#comment-16708927
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238713688
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -533,45 +618,49 @@ class MatchCodeGenerator(
   reusablePerRecordStatements += codeForAgg
 
   val defaultValue = primitiveDefaultValue(singleResultType)
-  val codeForSingleAgg =
+  val codeForSingleAgg = if (nullCheck) {
 j"""
|boolean $singleResultNullTerm;
-   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
-   |  .getField(${aggregates.size});
-   |if ($singleResultTerm != null) {
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
|  $singleResultNullTerm = false;
|} else {
|  $singleResultNullTerm = true;
|  $singleResultTerm = $defaultValue;
|}
|""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
 
 Review comment:
   This line can be dropped. The null term is not evaluated if nullCheck is 
disabled.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708929#comment-16708929
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238706596
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -726,13 +727,14 @@ object AggregateUtil {
 tableConfig: TableConfig): MapPartitionFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, isDistinctAggs, accTypes, _) = 
transformToAggregateFunctions(
+val aggregateMetadata = extractAggregateMetadata(
   namedAggregates.map(_.getKey),
   physicalInputRowType,
   needRetract,
   tableConfig)
 
-val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
+val aggMapping = (0 until aggregateMetadata.getAggregateCallsCount).map(_ 
+ groupings.length)
 
 Review comment:
   This occurs quite often (I counted 10 times). Maybe create helper function 
in `AggregateMetadata` that takes the grouping length such as 
`getAdjustedMapping()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708926#comment-16708926
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238711587
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -504,24 +588,25 @@ class MatchCodeGenerator(
 
 private val rowTypeTerm = "org.apache.flink.types.Row"
 
-def getOrAddAggregation(call: RexCall): GeneratedExpression = {
-  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
 case Some(expr) =>
   expr
 
 case None =>
-  val exp: GeneratedExpression = generateAggAccess(call)
-  aggregates += call
-  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
 
 Review comment:
   The naming is still not perfect here. What is the difference between 
`generateAggAccess` and `doGenerateAggAccess`? How about 
`generateDeduplicatedAggAccess` and `generateAggAccess`?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708924#comment-16708924
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238718548
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -691,12 +795,14 @@ class MatchCodeGenerator(
 
 class PatternVariableFinder extends RexDefaultVisitor[Option[String]] {
 
+  val ALL_PATTERN_VARIABLE = "*"
 
 Review comment:
   We could even move this constant definition to `MatchUtil`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708928#comment-16708928
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238718086
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708925#comment-16708925
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238716344
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -582,22 +671,33 @@ class MatchCodeGenerator(
 
   val transformFuncName = s"transformRowForAgg_$variableUID"
   val inputTransform: String = generateAggInputExprEvaluation(
-exprs,
+matchAgg.inputExprs,
 transformFuncName)
 
   generateAggCalculation(aggFunc, transformFuncName, inputTransform)
 }
 
+private case class LogicalMatchAggCall(
 
 Review comment:
   nit: move all classes to the bottom to logically separate them from methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708923#comment-16708923
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238709945
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -49,6 +50,90 @@ import scala.collection.mutable
 /**
   * A code generator for generating CEP related functions.
   *
+  * Aggregates are generated as follows:
+  * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable
+  * 2. even if the same aggregation is used multiple times in an expression
+  *(e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To 
do so [[AggBuilder]]
+  *keeps set of already seen different aggregation calls, and reuses the 
code to access
+  *appropriate field of aggregation result
+  * 3. after translating every expression (either in [[generateCondition]] or 
in
+  *[[generateOneRowPerMatchExpression]]) there will be generated code for
+  *   * [[GeneratedFunction]], which will be an inner class
+  *   * said [[GeneratedFunction]] will be instantiated in the ctor and 
opened/closed
+  * in corresponding methods of top level generated classes
+  *   * function that transforms input rows (row by row) into aggregate 
input rows
+  *   * function that calculates aggregates for variable, that uses the 
previous method
+  *The generated code will look similar to this:
+  *
+  *
+  * {{{
+  *
+  * public class MatchRecognizePatternSelectFunction$175 extends 
RichPatternSelectFunction {
+  *
+  * // Class used to calculate aggregates for a single pattern variable
+  * public final class AggFunction_variable$115$151 extends 
GeneratedAggregations {
+  *   ...
+  * }
+  *
+  * private final AggFunction_variable$115$151 aggregator_variable$115;
+  *
+  * public MatchRecognizePatternSelectFunction$175() {
+  *   aggregator_variable$115 = new AggFunction_variable$115$151();
+  * }
+  *
+  * public void open() {
+  *   aggregator_variable$115.open();
+  *   ...
+  * }
+  *
+  * // Function to transform incoming row into aggregate specific row. It 
can e.g calculate
+  * // inner expression of said aggregate
+  * private Row transformRowForAgg_variable$115(Row inAgg) {
+  * ...
+  * }
+  *
+  * // Function to calculate all aggregates for a single pattern variable
+  * private Row calculateAgg_variable$115(List input) {
+  *   Acc accumulator = aggregator_variable$115.createAccumulator();
+  *   for (Row row : input) {
+  * aggregator_variable$115.accumulate(accumulator, 
transformRowForAgg_variable$115(row));
+  *   }
+  *
+  *   return aggregator_variable$115.getResult(accumulator);
+  * }
+  *
+  * @Override
+  * public Object select(Map> in1) throws Exception {
+  *
+  *   // Extract list of rows assigned to a single pattern variable
+  *   java.util.List patternEvents$130 = (java.util.List) in1.get("A");
+  *   ...
+  *
+  *   // Calculate aggregates
+  *   Row aggRow_variable$110$111 = 
calculateAgg_variable$110(patternEvents$114);
+  *
+  *   // Every aggregation (e.g SUM(A.price) and AVG(A.price)) will be 
extracted to a variable
+  *   double result$135 = aggRow_variable$126$127.getField(0);
+  *   long result$137 = aggRow_variable$126$127.getField(1);
+  *
+  *   // Result of aggregation will be used in expression evaluation
+  *   out.setField(0, result$135)
+  *
+  *   long result$140 = result$137 * 2;
+  *   out.setField(1, result$140);
+  *
+  *   double result$144 = $result135 + result$137;
+  *   out.setField(2, result$144);
+  * }
+  *
+  * public void close() {
+  *   aggregator_variable$115.close();
+  *   ...
+  * }
+  *
+  * }
+  * }}}
+  *
 
 Review comment:
   Awesome documentation. Very helpful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid 

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707016#comment-16707016
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on issue #7177: [FLINK-7599] [table] Support for aggregates 
in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#issuecomment-443675670
 
 
   Hi @dianfu, @twalthr . Thank you for your review. I've updated the PR 
according to your comments. 
   
   There are still two main discussion points from @twalthr review:
   1. Moving `PatternVariableFinder`, which also validates that there is only a 
single pattern variable in an aggregation, to e.g. `DataStreamMatchRule`. The 
main role of this class is to extract that single variable though. If I 
performed the validation at an earlier stage, I would have to traverse the 
expression with the same code in `MatchCodeGenerator` anyway, but I am ok with 
it, if you think it is worth it.
   2. Using `aggCall.toString` as a key for reusing code for aggregation for a 
single variable that was used multiple times in an expression. I could not 
think of better way to identify that it is the same expression. Do you have an 
idea what could I use instead?
   
   I would appreciate if you had another look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707001#comment-16707001
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238226741
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
 
 Review comment:
   It actually generates only the access. It does not generate computing the 
aggregate. That name might not be perfect, but actually I would like to 
differentiate it somehow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706998#comment-16706998
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238226363
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
 
 Review comment:
   I think it makes sense here, as it is a term for a single aggregation from a 
set of all calculated for a pattern variable. It makes even more sense as I 
moved a variable that holds a term name for result containing all aggregates 
into this function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704253#comment-16704253
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237724165
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -42,6 +50,90 @@ import scala.collection.mutable
 /**
   * A code generator for generating CEP related functions.
   *
+  * Aggregates are generated as follows:
+  * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable
+  * 2. even if the same aggregation is used multiple times in an expression
+  *(e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To 
do so [[AggBuilder]]
+  *keeps set of already seen different aggregation calls, and reuses the 
code to access
+  *appropriate field of aggregation result
+  * 3. after translating every expression (either in [[generateCondition]] or 
in
+  *[[generateOneRowPerMatchExpression]]) there will be generated code for
+  *   * [[GeneratedFunction]], which will be an inner class
 
 Review comment:
   code format


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704251#comment-16704251
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237731718
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
+  val matchAgg = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+matchAgg.aggregations.map(_.aggFunction).toArray,
+matchAgg.aggregations.map(_.inputIndices).toArray,
+matchAgg.aggregations.indices.toArray,
+Array.fill(matchAgg.aggregations.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+matchAgg.aggregations.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+matchAgg.inputExprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class LogicalMatchAggCall(
+  function: SqlAggFunction,
+  inputTypes: Seq[RelDataType],
+  exprIndices: Seq[Int]
+)
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private case class MatchAgg(
+  aggregations: Seq[MatchAggCall],
+  inputExprs: Seq[RexNode]
+)
+
+private def extractAggregatesAndExpressions: MatchAgg = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val logicalAggregates = 

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704248#comment-16704248
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237739790
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
+  val matchAgg = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+matchAgg.aggregations.map(_.aggFunction).toArray,
+matchAgg.aggregations.map(_.inputIndices).toArray,
+matchAgg.aggregations.indices.toArray,
+Array.fill(matchAgg.aggregations.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+matchAgg.aggregations.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+matchAgg.inputExprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class LogicalMatchAggCall(
+  function: SqlAggFunction,
+  inputTypes: Seq[RelDataType],
+  exprIndices: Seq[Int]
+)
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private case class MatchAgg(
+  aggregations: Seq[MatchAggCall],
+  inputExprs: Seq[RexNode]
+)
+
+private def extractAggregatesAndExpressions: MatchAgg = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val logicalAggregates = 

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704249#comment-16704249
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237737335
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
 
 Review comment:
   What about removing the prefix `single`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704244#comment-16704244
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237737176
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
 
 Review comment:
   Define `calculateAgg_$variableUID` as a variable such as `val 
calculateAggFuncName = calculateAgg_$variableUID`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704254#comment-16704254
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237726170
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
 
 Review comment:
   rename to aggCall


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704252#comment-16704252
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237472659
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
 
 Review comment:
   What about renaming to generateAggregation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704247#comment-16704247
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237472868
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
 
 Review comment:
   getOrComputeAggregation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704246#comment-16704246
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237727039
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
 
 Review comment:
   `patternName` doesn't reflects what this variable holds. What about rename 
`patternName` to `eventsExpr` or something else?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704245#comment-16704245
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237466962
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -71,12 +78,28 @@ class MatchCodeGenerator(
   private var offset: Int = 0
   private var first : Boolean = false
 
+  /**
+* Flags that tells if we generate expressions inside an aggregate. It 
tells how to access input
 
 Review comment:
   Flags -> Flag


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704250#comment-16704250
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237738914
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
 
 Review comment:
   Remove the whitespace before the colon


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703484#comment-16703484
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237569734
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+
+Aggregate functions are applied to subset of rows mapped to a match. To 
understand how those subsets are evaluated have a look at the [event stream 
navigation](#pattern-navigation) section.
+
+With a task to find the longest period of time for which the average price of 
a ticker did not go below certain threshold, one can see how expressible 
`MATCH_RECOGNIZE` can become with aggregations.
+This task can be performed with the following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+MATCH_RECOGNIZE (
+PARTITION BY symbol
+ORDER BY rowtime
+MEASURES
+FIRST(A.rowtime) AS start_tstamp,
+LAST(A.rowtime) AS end_tstamp,
+AVG(A.price) AS avgPrice
+ONE ROW PER MATCH
+AFTER MATCH SKIP TO FIRST B
+PATTERN (A+ B)
+DEFINE
+A AS AVG(A.price) < 15
+) MR;
+{% endhighlight %}
+
+Given this query and following input values:
+
+{% highlight text %}
+symbol rowtime pricetax
+==    === ===
+'ACME'  '01-Apr-11 10:00:00'   12  1
+'ACME'  '01-Apr-11 10:00:01'   17  2
+'ACME'  '01-Apr-11 10:00:02'   13  1
+'ACME'  '01-Apr-11 10:00:03'   16  3
+'ACME'  '01-Apr-11 10:00:04'   25  2
+'ACME'  '01-Apr-11 10:00:05'   2   1
+'ACME'  '01-Apr-11 10:00:06'   4   1
+'ACME'  '01-Apr-11 10:00:07'   10  2
+'ACME'  '01-Apr-11 10:00:08'   15  2
+'ACME'  '01-Apr-11 10:00:09'   25  2
+'ACME'  '01-Apr-11 10:00:10'   30  1
+{% endhighlight %}
+
+The query will accumulate events as part of `A` pattern variable as long as 
the average price of them does not exceed 15. Which will happen at `01-Apr-11 
10:00:04`. The next such period that starts then will
+exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said 
query will be:
+
+{% highlight text %}
+ symbol   start_tstamp   end_tstamp  avgPrice
+=  ==  ==  
+ACME   01-APR-11 10:00:00  01-APR-11 10:00:03 14.5
+ACME   01-APR-11 10:00:04  01-APR-11 10:00:09 13.5
+{% endhighlight %}
+
+An important thing to have in mind is how aggregates behave in situation when 
no rows where mapped to certain pattern variable. Every aggregate, beside 
`COUNT` will produce `null` in those cases. `COUNT` on the other hand will
 
 Review comment:
   You are right, didn't think of UDAGs. Do you think it is worth adding such 
disclaimer at all. I've added it in the first place, because there was 
something similar in the SQL standard, but there are no UDAGs there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703385#comment-16703385
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237544327
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
 
 Review comment:
   Sorry about this comment I totelly missed the following line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703386#comment-16703386
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237544327
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
 
 Review comment:
   Sorry about this comment I totally missed the following line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703344#comment-16703344
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237529902
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703355#comment-16703355
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237533224
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703342#comment-16703342
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237529378
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703343#comment-16703343
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237529378
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703330#comment-16703330
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237526604
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
 
 Review comment:
   Hmmm, no I am not. Have you maybe missed the `.getField()` call?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703327#comment-16703327
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237526604
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
 
 Review comment:
   Hmmm, no I am not. Have you maybe missed the `.getField()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703202#comment-16703202
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237485540
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
 
 Review comment:
   Rephrase:
   ```
   Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom 
[user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703200#comment-16703200
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237484391
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
 
 Review comment:
   Just call it `Aggregations`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703209#comment-16703209
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237494549
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
 ##
 @@ -0,0 +1,6337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.validate;
+
+import org.apache.calcite.config.NullCollation;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.linq4j.function.Function2;
+import org.apache.calcite.linq4j.function.Functions;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.type.DynamicRecordType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.runtime.CalciteException;
+import org.apache.calcite.runtime.Feature;
+import org.apache.calcite.runtime.Resources;
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.ModifiableViewTable;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlAccessEnum;
+import org.apache.calcite.sql.SqlAccessType;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDelete;
+import org.apache.calcite.sql.SqlDynamicParam;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlMerge;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSampleSpec;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlUnresolvedFunction;
+import org.apache.calcite.sql.SqlUpdate;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.AssignableOperandTypeChecker;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandTypeInference;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlShuttle;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql2rel.InitializerContext;
+import 

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703205#comment-16703205
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237485908
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+
+Aggregate functions are applied to subset of rows mapped to a match. To 
understand how those subsets are evaluated have a look at the [event stream 
navigation](#pattern-navigation) section.
 
 Review comment:
   `to each subset`
   `In order to understand how those subsets are evaluated, `


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703199#comment-16703199
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237491268
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+
+Aggregate functions are applied to subset of rows mapped to a match. To 
understand how those subsets are evaluated have a look at the [event stream 
navigation](#pattern-navigation) section.
+
+With a task to find the longest period of time for which the average price of 
a ticker did not go below certain threshold, one can see how expressible 
`MATCH_RECOGNIZE` can become with aggregations.
+This task can be performed with the following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+MATCH_RECOGNIZE (
+PARTITION BY symbol
+ORDER BY rowtime
+MEASURES
+FIRST(A.rowtime) AS start_tstamp,
+LAST(A.rowtime) AS end_tstamp,
+AVG(A.price) AS avgPrice
+ONE ROW PER MATCH
+AFTER MATCH SKIP TO FIRST B
+PATTERN (A+ B)
+DEFINE
+A AS AVG(A.price) < 15
+) MR;
+{% endhighlight %}
+
+Given this query and following input values:
+
+{% highlight text %}
+symbol rowtime pricetax
+==    === ===
+'ACME'  '01-Apr-11 10:00:00'   12  1
+'ACME'  '01-Apr-11 10:00:01'   17  2
+'ACME'  '01-Apr-11 10:00:02'   13  1
+'ACME'  '01-Apr-11 10:00:03'   16  3
+'ACME'  '01-Apr-11 10:00:04'   25  2
+'ACME'  '01-Apr-11 10:00:05'   2   1
+'ACME'  '01-Apr-11 10:00:06'   4   1
+'ACME'  '01-Apr-11 10:00:07'   10  2
+'ACME'  '01-Apr-11 10:00:08'   15  2
+'ACME'  '01-Apr-11 10:00:09'   25  2
+'ACME'  '01-Apr-11 10:00:10'   30  1
+{% endhighlight %}
+
+The query will accumulate events as part of `A` pattern variable as long as 
the average price of them does not exceed 15. Which will happen at `01-Apr-11 
10:00:04`. The next such period that starts then will
 
 Review comment:
   ```
   The query will accumulate events as part of pattern variable `A` as long as 
the average price of those events does not exceed `15`. For example, such a 
limit exceeding happens at `01-Apr-11 10:00:04`. The following period exceeds 
the average price of `15` again at `01-Apr-11 10:00:10`.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703206#comment-16703206
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237491775
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+
+Aggregate functions are applied to subset of rows mapped to a match. To 
understand how those subsets are evaluated have a look at the [event stream 
navigation](#pattern-navigation) section.
+
+With a task to find the longest period of time for which the average price of 
a ticker did not go below certain threshold, one can see how expressible 
`MATCH_RECOGNIZE` can become with aggregations.
+This task can be performed with the following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+MATCH_RECOGNIZE (
+PARTITION BY symbol
+ORDER BY rowtime
+MEASURES
+FIRST(A.rowtime) AS start_tstamp,
+LAST(A.rowtime) AS end_tstamp,
+AVG(A.price) AS avgPrice
+ONE ROW PER MATCH
+AFTER MATCH SKIP TO FIRST B
+PATTERN (A+ B)
+DEFINE
+A AS AVG(A.price) < 15
+) MR;
+{% endhighlight %}
+
+Given this query and following input values:
+
+{% highlight text %}
+symbol rowtime pricetax
+==    === ===
+'ACME'  '01-Apr-11 10:00:00'   12  1
+'ACME'  '01-Apr-11 10:00:01'   17  2
+'ACME'  '01-Apr-11 10:00:02'   13  1
+'ACME'  '01-Apr-11 10:00:03'   16  3
+'ACME'  '01-Apr-11 10:00:04'   25  2
+'ACME'  '01-Apr-11 10:00:05'   2   1
+'ACME'  '01-Apr-11 10:00:06'   4   1
+'ACME'  '01-Apr-11 10:00:07'   10  2
+'ACME'  '01-Apr-11 10:00:08'   15  2
+'ACME'  '01-Apr-11 10:00:09'   25  2
+'ACME'  '01-Apr-11 10:00:10'   30  1
+{% endhighlight %}
+
+The query will accumulate events as part of `A` pattern variable as long as 
the average price of them does not exceed 15. Which will happen at `01-Apr-11 
10:00:04`. The next such period that starts then will
+exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said 
query will be:
+
+{% highlight text %}
+ symbol   start_tstamp   end_tstamp  avgPrice
+=  ==  ==  
+ACME   01-APR-11 10:00:00  01-APR-11 10:00:03 14.5
+ACME   01-APR-11 10:00:04  01-APR-11 10:00:09 13.5
+{% endhighlight %}
+
+An important thing to have in mind is how aggregates behave in situation when 
no rows where mapped to certain pattern variable. Every aggregate, beside 
`COUNT` will produce `null` in those cases. `COUNT` on the other hand will
 
 Review comment:
   ```
   An important thing to have in mind is how aggregates behave in situation 
when no rows were mapped to a certain pattern variable. Every aggregate (except 
for `COUNT`) will produce `null` in those cases. `COUNT` on the other hand will 
produce 0.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703201#comment-16703201
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237493263
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+
+Aggregate functions are applied to subset of rows mapped to a match. To 
understand how those subsets are evaluated have a look at the [event stream 
navigation](#pattern-navigation) section.
+
+With a task to find the longest period of time for which the average price of 
a ticker did not go below certain threshold, one can see how expressible 
`MATCH_RECOGNIZE` can become with aggregations.
+This task can be performed with the following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+MATCH_RECOGNIZE (
+PARTITION BY symbol
+ORDER BY rowtime
+MEASURES
+FIRST(A.rowtime) AS start_tstamp,
+LAST(A.rowtime) AS end_tstamp,
+AVG(A.price) AS avgPrice
+ONE ROW PER MATCH
+AFTER MATCH SKIP TO FIRST B
+PATTERN (A+ B)
+DEFINE
+A AS AVG(A.price) < 15
+) MR;
+{% endhighlight %}
+
+Given this query and following input values:
+
+{% highlight text %}
+symbol rowtime pricetax
+==    === ===
+'ACME'  '01-Apr-11 10:00:00'   12  1
+'ACME'  '01-Apr-11 10:00:01'   17  2
+'ACME'  '01-Apr-11 10:00:02'   13  1
+'ACME'  '01-Apr-11 10:00:03'   16  3
+'ACME'  '01-Apr-11 10:00:04'   25  2
+'ACME'  '01-Apr-11 10:00:05'   2   1
+'ACME'  '01-Apr-11 10:00:06'   4   1
+'ACME'  '01-Apr-11 10:00:07'   10  2
+'ACME'  '01-Apr-11 10:00:08'   15  2
+'ACME'  '01-Apr-11 10:00:09'   25  2
+'ACME'  '01-Apr-11 10:00:10'   30  1
+{% endhighlight %}
+
+The query will accumulate events as part of `A` pattern variable as long as 
the average price of them does not exceed 15. Which will happen at `01-Apr-11 
10:00:04`. The next such period that starts then will
+exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said 
query will be:
+
+{% highlight text %}
+ symbol   start_tstamp   end_tstamp  avgPrice
+=  ==  ==  
+ACME   01-APR-11 10:00:00  01-APR-11 10:00:03 14.5
+ACME   01-APR-11 10:00:04  01-APR-11 10:00:09 13.5
+{% endhighlight %}
+
+An important thing to have in mind is how aggregates behave in situation when 
no rows where mapped to certain pattern variable. Every aggregate, beside 
`COUNT` will produce `null` in those cases. `COUNT` on the other hand will
 
 Review comment:
   But is this statement true? Doesn't the empty case depend on the aggregate 
function that you implemented? If you are using a UDF it must not be null right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703204#comment-16703204
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237486274
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+
+Aggregate functions are applied to subset of rows mapped to a match. To 
understand how those subsets are evaluated have a look at the [event stream 
navigation](#pattern-navigation) section.
+
+With a task to find the longest period of time for which the average price of 
a ticker did not go below certain threshold, one can see how expressible 
`MATCH_RECOGNIZE` can become with aggregations.
+This task can be performed with the following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+MATCH_RECOGNIZE (
+PARTITION BY symbol
+ORDER BY rowtime
+MEASURES
+FIRST(A.rowtime) AS start_tstamp,
+LAST(A.rowtime) AS end_tstamp,
+AVG(A.price) AS avgPrice
+ONE ROW PER MATCH
+AFTER MATCH SKIP TO FIRST B
+PATTERN (A+ B)
+DEFINE
+A AS AVG(A.price) < 15
+) MR;
+{% endhighlight %}
+
+Given this query and following input values:
+
+{% highlight text %}
+symbol rowtime pricetax
+==    === ===
+'ACME'  '01-Apr-11 10:00:00'   12  1
+'ACME'  '01-Apr-11 10:00:01'   17  2
+'ACME'  '01-Apr-11 10:00:02'   13  1
+'ACME'  '01-Apr-11 10:00:03'   16  3
+'ACME'  '01-Apr-11 10:00:04'   25  2
+'ACME'  '01-Apr-11 10:00:05'   2   1
+'ACME'  '01-Apr-11 10:00:06'   4   1
+'ACME'  '01-Apr-11 10:00:07'   10  2
+'ACME'  '01-Apr-11 10:00:08'   15  2
+'ACME'  '01-Apr-11 10:00:09'   25  2
+'ACME'  '01-Apr-11 10:00:10'   30  1
+{% endhighlight %}
+
+The query will accumulate events as part of `A` pattern variable as long as 
the average price of them does not exceed 15. Which will happen at `01-Apr-11 
10:00:04`. The next such period that starts then will
+exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said 
query will be:
+
+{% highlight text %}
+ symbol   start_tstamp   end_tstamp  avgPrice
+=  ==  ==  
+ACME   01-APR-11 10:00:00  01-APR-11 10:00:03 14.5
+ACME   01-APR-11 10:00:04  01-APR-11 10:00:09 13.5
+{% endhighlight %}
+
+An important thing to have in mind is how aggregates behave in situation when 
no rows where mapped to certain pattern variable. Every aggregate, beside 
`COUNT` will produce `null` in those cases. `COUNT` on the other hand will
+produce 0.
+
+Attention `DISTINCT` aggregations are 
not supported. Moreover the `DISTINCT` modifier will be silently dropped if 
specified for aggregation!
 
 Review comment:
   Mention that you can not mix pattern variables. Mention that you can use 
expressions as parameters?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703203#comment-16703203
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237488275
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,66 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Scalar & Aggregate functions
+
+One can use scalar and aggregate functions in those clauses, both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as 
provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+
+Aggregate functions are applied to subset of rows mapped to a match. To 
understand how those subsets are evaluated have a look at the [event stream 
navigation](#pattern-navigation) section.
+
+With a task to find the longest period of time for which the average price of 
a ticker did not go below certain threshold, one can see how expressible 
`MATCH_RECOGNIZE` can become with aggregations.
 
 Review comment:
   Introduce a subsection `Example` as done in other sections?
   Rephrase: `The task of the following example is to find... threshold. It 
shows how...`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703156#comment-16703156
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237476877
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
 
 Review comment:
   You are creating a primitive value for a boxed type term?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703163#comment-16703163
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237479598
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
 
 Review comment:
   Something is wrong here. You are casting a `Row` into the result type of the 
aggregation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703170#comment-16703170
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237464842
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
 
 Review comment:
   Can you add comments to this method or break the logic into more variables 
in the method. It is very difficult to understand the Scala magic here. You 
call the method `extractAggregatesAndExpressions` but what do the expression 
that come out of this method describe? Also you use too generic `aggs` and 
`agg` variables name with different data types and meaning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: 

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703168#comment-16703168
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237442180
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+* This query checks:
+*
+* 1. count(D.price) produces 0, because no rows matched to D
+* 2. sum(D.price) produces null, because no rows matched to D
+* 3. aggregates that take multiple parameters work
+* 4. aggregates with expressions work
+*/
+  @Test
+  def testCepAggregates(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+data.+=((1, "a", 1, 0.8, 1))
+data.+=((2, "z", 2, 0.8, 3))
+data.+=((3, "b", 1, 0.8, 2))
+data.+=((4, "c", 1, 0.8, 5))
+data.+=((5, "d", 4, 0.1, 5))
+data.+=((6, "a", 2, 1.5, 2))
+data.+=((7, "b", 2, 0.8, 3))
+data.+=((8, "c", 1, 0.8, 2))
+data.+=((9, "h", 2, 0.8, 3))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("weightedAvg", new WeightedAvg)
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as startId,
+ |SUM(A.price) AS sumA,
+ |COUNT(DISTINCT D.price) AS countD,
+ |SUM(D.price) as sumD,
+ |weightedAvg(price, weight) as wAvg,
+ |AVG(B.price) AS avgB,
+ |SUM(B.price * B.rate) as sumExprB,
+ |LAST(id) as endId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ B+ C D? E )
+ |  DEFINE
+ |A AS SUM(A.price) < 6,
+ |B AS SUM(B.price * B.rate) < SUM(A.price) AND
+ | SUM(B.price * B.rate) > 0.2 AND
+ | SUM(B.price) >= 1 AND
+ | AVG(B.price) >= 1 AND
+ | weightedAvg(price, weight) > 1
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8")
 
 Review comment:
   Produce a second row to be on the safe side?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703153#comment-16703153
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237436464
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703155#comment-16703155
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237434654
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+* This query checks:
+*
+* 1. count(D.price) produces 0, because no rows matched to D
+* 2. sum(D.price) produces null, because no rows matched to D
+* 3. aggregates that take multiple parameters work
+* 4. aggregates with expressions work
+*/
+  @Test
+  def testCepAggregates(): Unit = {
 
 Review comment:
   nit: We are already in the `MatchRecognizeITCase` so we don't need to prefix 
every method with CEP.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703160#comment-16703160
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237437779
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
 ##
 @@ -21,8 +21,8 @@ package org.apache.flink.table.`match`
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 
 Review comment:
   Can you rename this class into `MatchRecognizeValidationTest`? Otherwise it 
doesn't match our current naming schema.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703154#comment-16703154
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237439450
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
 ##
 @@ -145,22 +139,21 @@ class MatchOperatorValidationTest extends TableTestBase {
  |MATCH_RECOGNIZE (
  |  ORDER BY proctime
  |  MEASURES
- |A.symbol AS aSymbol
- |  ALL ROWS PER MATCH
+ |SUM(A.price + B.tax) AS taxedPrice
  |  PATTERN (A B)
  |  DEFINE
- |A AS symbol = 'a'
+ |A AS A.symbol = 'a'
  |) AS T
  |""".stripMargin
 
 streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
   }
 
   @Test
-  def testGreedyQuantifierAtTheEndIsNotSupported(): Unit = {
-thrown.expectMessage("Greedy quantifiers are not allowed as the last 
element of a " +
-  "Pattern yet. Finish your pattern with either a simple variable or 
reluctant quantifier.")
-thrown.expect(classOf[TableException])
+  def testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs(): Unit = {
+thrown.expect(classOf[ValidationException])
 
 Review comment:
   Add part of the message here as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703164#comment-16703164
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237468069
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
 
 Review comment:
   This looks hacky to me. Why do we add the string representation of a rex 
call to input expressions? It could lead to wrong results if the string 
representation looks the same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703159#comment-16703159
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237439056
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
 ##
 @@ -128,15 +128,9 @@ class MatchOperatorValidationTest extends TableTestBase {
 streamUtils.tableEnv.sqlQuery(sqlQuery).toRetractStream[Row]
   }
 
-  // 
***
-  // * Those validations are temporary. We should remove those tests once we 
support those *
-  // * features.   
*
-  // 
***
-
   @Test
-  def testAllRowsPerMatch(): Unit = {
-thrown.expectMessage("All rows per match mode is not supported yet.")
-thrown.expect(classOf[TableException])
+  def testAggregatesOnMultiplePatternVariablesNotSupported(): Unit = {
+thrown.expect(classOf[ValidationException])
 
 Review comment:
   add a part of the message as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703158#comment-16703158
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237450352
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+* This query checks:
+*
+* 1. count(D.price) produces 0, because no rows matched to D
+* 2. sum(D.price) produces null, because no rows matched to D
+* 3. aggregates that take multiple parameters work
+* 4. aggregates with expressions work
+*/
+  @Test
+  def testCepAggregates(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+data.+=((1, "a", 1, 0.8, 1))
+data.+=((2, "z", 2, 0.8, 3))
+data.+=((3, "b", 1, 0.8, 2))
+data.+=((4, "c", 1, 0.8, 5))
+data.+=((5, "d", 4, 0.1, 5))
+data.+=((6, "a", 2, 1.5, 2))
+data.+=((7, "b", 2, 0.8, 3))
+data.+=((8, "c", 1, 0.8, 2))
+data.+=((9, "h", 2, 0.8, 3))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("weightedAvg", new WeightedAvg)
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as startId,
+ |SUM(A.price) AS sumA,
+ |COUNT(DISTINCT D.price) AS countD,
+ |SUM(D.price) as sumD,
+ |weightedAvg(price, weight) as wAvg,
+ |AVG(B.price) AS avgB,
+ |SUM(B.price * B.rate) as sumExprB,
+ |LAST(id) as endId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ B+ C D? E )
+ |  DEFINE
+ |A AS SUM(A.price) < 6,
+ |B AS SUM(B.price * B.rate) < SUM(A.price) AND
+ | SUM(B.price * B.rate) > 0.2 AND
+ | SUM(B.price) >= 1 AND
+ | AVG(B.price) >= 1 AND
+ | weightedAvg(price, weight) > 1
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testCepAggregatesWithNullInputs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[Row]
+data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer))
+data.+=(Row.of(2:java.lang.Integer, "z", 10:java.lang.Integer))
+data.+=(Row.of(3:java.lang.Integer, "b", null))
+data.+=(Row.of(4:java.lang.Integer, "c", null))
+data.+=(Row.of(5:java.lang.Integer, "d", 3:java.lang.Integer))
+data.+=(Row.of(6:java.lang.Integer, "c", 3:java.lang.Integer))
+data.+=(Row.of(7:java.lang.Integer, "c", 3:java.lang.Integer))
+data.+=(Row.of(8:java.lang.Integer, "c", 3:java.lang.Integer))
+data.+=(Row.of(9:java.lang.Integer, "c", 2:java.lang.Integer))
+
+val t = env.fromCollection(data)(new RowTypeInfo(
 
 Review comment:
   nit: use the `Types` class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>





[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703171#comment-16703171
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237450569
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+* This query checks:
+*
+* 1. count(D.price) produces 0, because no rows matched to D
+* 2. sum(D.price) produces null, because no rows matched to D
+* 3. aggregates that take multiple parameters work
+* 4. aggregates with expressions work
+*/
+  @Test
+  def testCepAggregates(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+data.+=((1, "a", 1, 0.8, 1))
+data.+=((2, "z", 2, 0.8, 3))
+data.+=((3, "b", 1, 0.8, 2))
+data.+=((4, "c", 1, 0.8, 5))
+data.+=((5, "d", 4, 0.1, 5))
+data.+=((6, "a", 2, 1.5, 2))
+data.+=((7, "b", 2, 0.8, 3))
+data.+=((8, "c", 1, 0.8, 2))
+data.+=((9, "h", 2, 0.8, 3))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("weightedAvg", new WeightedAvg)
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as startId,
+ |SUM(A.price) AS sumA,
+ |COUNT(DISTINCT D.price) AS countD,
+ |SUM(D.price) as sumD,
+ |weightedAvg(price, weight) as wAvg,
+ |AVG(B.price) AS avgB,
+ |SUM(B.price * B.rate) as sumExprB,
+ |LAST(id) as endId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ B+ C D? E )
+ |  DEFINE
+ |A AS SUM(A.price) < 6,
+ |B AS SUM(B.price * B.rate) < SUM(A.price) AND
+ | SUM(B.price * B.rate) > 0.2 AND
+ | SUM(B.price) >= 1 AND
+ | AVG(B.price) >= 1 AND
+ | weightedAvg(price, weight) > 1
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testCepAggregatesWithNullInputs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[Row]
+data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer))
 
 Review comment:
   nit: hint you can also use `Int.box()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703152#comment-16703152
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237441497
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+* This query checks:
+*
+* 1. count(D.price) produces 0, because no rows matched to D
+* 2. sum(D.price) produces null, because no rows matched to D
+* 3. aggregates that take multiple parameters work
+* 4. aggregates with expressions work
+*/
+  @Test
+  def testCepAggregates(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+data.+=((1, "a", 1, 0.8, 1))
+data.+=((2, "z", 2, 0.8, 3))
+data.+=((3, "b", 1, 0.8, 2))
+data.+=((4, "c", 1, 0.8, 5))
+data.+=((5, "d", 4, 0.1, 5))
+data.+=((6, "a", 2, 1.5, 2))
+data.+=((7, "b", 2, 0.8, 3))
+data.+=((8, "c", 1, 0.8, 2))
+data.+=((9, "h", 2, 0.8, 3))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("weightedAvg", new WeightedAvg)
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as startId,
+ |SUM(A.price) AS sumA,
+ |COUNT(DISTINCT D.price) AS countD,
 
 Review comment:
   remove this when we don't support it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703161#comment-16703161
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237479817
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
 
 Review comment:
   Unnecessary variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703169#comment-16703169
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237460915
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -72,12 +78,28 @@ class MatchCodeGenerator(
   private var offset: Int = 0
   private var first : Boolean = false
 
+  /**
+* Flags that tells if we generate expressions inside an aggregate. It 
tells how to access input
+* row.
+*/
+  private var innerAggExpr: Boolean = false
+
+  /**
+* Name of term in function used to transform input row into aggregate 
input row.
+*/
+  private val inputAggRowTerm = newName("inAgg")
 
 Review comment:
   All final variable term in this class must not be created with `newName` 
they can be constant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703167#comment-16703167
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237465668
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703172#comment-16703172
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237470829
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
 
 Review comment:
   Give the parameter of this method a better name. Because you assume an 
aggregate function call in this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703157#comment-16703157
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237436990
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val singleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg =
+j"""
+   |boolean $singleResultNullTerm;
+   |$singleResultTypeTerm $singleResultTerm = ($singleResultTypeTerm) 
$resultRowTerm
+   |  .getField(${aggregates.size});
+   |if ($singleResultTerm != null) {
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  val exp = GeneratedExpression(singleResultTerm,
+singleResultNullTerm,
+NO_CODE,
+singleResultType)
+  exp
+}
+
+def generateAggFunction() : Unit = {
+  val (aggs, exprs) = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+exprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+aggs.map(_.aggFunction).toArray,
+aggs.map(_.inputIndices).toArray,
+aggs.indices.toArray,
+Array.fill(aggs.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+aggs.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+exprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private def extractAggregatesAndExpressions: (Seq[MatchAggCall], 
Seq[RexNode]) = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val aggs = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val callWithIndex = (innerCall, inputRows.size)
+  inputRows(innerCall.toString) = callWithIndex
+  callWithIndex
+  }
+}).toList
+
+val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
+(agg, callsWithIndices.map(_._1), callsWithIndices.map(_._2).toArray)
+  }).zipWithIndex.map {
+case (agg, index) =>
+  val result = AggregateUtil
+.transformToAggregateFunction(agg._1,
+  isDistinct = false,
+  agg._2.map(_.getType),
+  

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703166#comment-16703166
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237471737
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -304,10 +349,14 @@ class MatchCodeGenerator(
   }
 
   override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): 
GeneratedExpression = {
-if (fieldRef.getAlpha.equals("*") && currentPattern.isDefined && offset == 
0 && !first) {
-  generateInputAccess(input, input1Term, fieldRef.getIndex)
+if (innerAggExpr) {
 
 Review comment:
   Call this variable `isWithinAggExprState` or similar?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703162#comment-16703162
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237474378
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -429,6 +464,24 @@ class MatchCodeGenerator(
 GeneratedExpression(rowNameTerm, isRowNull, funcCode, input)
   }
 
+  private def findEventsByPatternName(
+  patternFieldAlpha: String)
+: GeneratedPatternList = {
+reusablePatternLists.get(patternFieldAlpha) match {
+  // input access and unboxing has already been generated
 
 Review comment:
   this comment has nothing to do with input access


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703165#comment-16703165
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237466297
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -445,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
 
 Review comment:
   Can you add a high level description how aggregates work in this code 
generator? Maybe here or even for the top-level class. Otherwise we don't 
remember it when we have to touch this class again. Maybe a little example code 
that explains which methods, classes, fields are generated and why? Not super 
detailed but at least to understand the concept. Actually this is also missing 
in the AggregationCodeGenerator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702975#comment-16702975
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237422016
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
 
 Review comment:
   Remove the default here and at other places. Implementers should explicitly 
state what they want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702972#comment-16702972
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237422016
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
 
 Review comment:
   Remove the default. Implementers should explicitly state what they want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702971#comment-16702971
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237422674
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
+  index: Int = 0)
 
 Review comment:
   Add more documentation to this function? What is `index` good for?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702967#comment-16702967
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237420010
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1185,331 +1273,281 @@ object AggregateUtil {
 val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size)
 val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: 
Any]](aggregateCalls.size)
 val accTypes = new Array[TypeInformation[_]](aggregateCalls.size)
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+val isDistinctAggs = new Array[Boolean](aggregateCalls.size)
 
-// create aggregate function instances by function type and aggregate 
field data type.
 
 Review comment:
   Restore this and the following line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702969#comment-16702969
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237424181
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
+  index: Int = 0)
+: (TableAggregateFunction[_, _],
 
 Review comment:
   Introduce a case class here as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702970#comment-16702970
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237417018
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1185,331 +1273,281 @@ object AggregateUtil {
 val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size)
 val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: 
Any]](aggregateCalls.size)
 val accTypes = new Array[TypeInformation[_]](aggregateCalls.size)
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+val isDistinctAggs = new Array[Boolean](aggregateCalls.size)
 
-// create aggregate function instances by function type and aggregate 
field data type.
-aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
-  val argList: util.List[Integer] = aggregateCall.getArgList
+aggregateCalls
+  .zipWithIndex.foreach {
+  case (aggregateCall, index) =>
+val argList: util.List[Integer] = aggregateCall.getArgList
 
-  if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-aggregates(index) = new CountAggFunction
 if (argList.isEmpty) {
-  aggFieldIndexes(index) = Array[Int](-1)
+  if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+aggFieldIndexes(index) = Array[Int](-1)
+  } else {
+throw new TableException("Aggregate fields should not be empty.")
+  }
 } else {
   aggFieldIndexes(index) = argList.asScala.map(i => i.intValue).toArray
 }
-  } else {
-if (argList.isEmpty) {
-  throw new TableException("Aggregate fields should not be empty.")
+
+val inputTypes = 
argList.map(aggregateInputType.getFieldList.get(_).getType)
+val result = transformToAggregateFunction(aggregateCall.getAggregation,
+  aggregateCall.isDistinct,
+  inputTypes,
+  needRetraction,
+  tableConfig,
+  isStateBackedDataViews,
+  index)
+
+aggregates(index) = result._1
+accTypes(index) = result._2
+accSpecs(index) = result._3
+isDistinctAggs(index) = aggregateCall.isDistinct
+}
+
+(aggFieldIndexes, aggregates, isDistinctAggs, accTypes, accSpecs)
+  }
+
+  private def createFlinkAggFunction(
 
 Review comment:
   Can you add a comment to every method in this class? This class is quite big 
every comment helps in understanding what is going on. For example, `Converts 
Calcite's [[SqlAggFunction]] to a Flink UDF [[TableAggregationFunction]]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702968#comment-16702968
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237419538
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
+  index: Int = 0)
+: (TableAggregateFunction[_, _],
+  TypeInformation[_],
+  Seq[DataViewSpec[_]]) = {
+// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
+// create aggregate function instances by function type and aggregate 
field data type.
+
+var accumulatorType: TypeInformation[_] = null
+val inputType = () => if (aggregateInputType.isEmpty) {
+  throw new TableException("Aggregate fields should not be empty.")
+} else {
+  aggregateInputType.get(0)
+}
+
+val aggregate: TableAggregateFunction[_, _] = aggregateCall match {
+
+  case _: SqlCountAggFunction =>
+new CountAggFunction
+
+  case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
+val agg = new 
CollectAggFunction(FlinkTypeFactory.toTypeInfo(inputType()))
+accumulatorType = agg.getAccumulatorType
+agg
+
+  case udagg: AggSqlFunction =>
+accumulatorType = udagg.accType
+udagg.getFunction
+
+  case aggFunction: SqlAggFunction =>
+createFlinkAggFunction(aggFunction,
+  needRetraction,
+  inputType(),
+  tableConfig)
+}
+
+val accSpecs =
+  if (accumulatorType != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  aggregate,
+  accumulatorType,
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accumulatorType = accType
+  specs.get
+} else {
+  Seq()
+}
+  } else {
+accumulatorType = getAccumulatorTypeOfAggregateFunction(aggregate)
+Seq()
+  }
+
+// create distinct accumulator filter argument
+if (isDistinct) {
+  // Using Pojo fields for the real underlying accumulator
+  val pojoFields = new util.ArrayList[PojoField]()
+  pojoFields.add(new PojoField(
+classOf[DistinctAccumulator[_]].getDeclaredField("realAcc"),
+accumulatorType)
+  )
+  // If StateBackend is not enabled, the distinct mapping also needs
+  // to be added to the Pojo fields.
+  if (!isStateBackedDataViews) {
+
+val argTypes: Array[TypeInformation[_]] = aggregateInputType
+  .map(FlinkTypeFactory.toTypeInfo).toArray
+
+val mapViewTypeInfo = new MapViewTypeInfo(
+  new RowTypeInfo(argTypes: _*),
+  BasicTypeInfo.LONG_TYPE_INFO)
+pojoFields.add(new PojoField(
+  classOf[DistinctAccumulator[_]].getDeclaredField("distinctValueMap"),
+  mapViewTypeInfo)
+)
+  }
+  accumulatorType = new PojoTypeInfo(classOf[DistinctAccumulator[_]], 
pojoFields)
+}
+
+(aggregate, accumulatorType, accSpecs)
+  }
+
   private def transformToAggregateFunctions(
 
 Review comment:
   We definitely need more documentation for this function. Can you create case 
classes for the return type of this function for proper naming because 
`Array[Int]` or `Array[Boolean]` is not very helpful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699166#comment-16699166
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dawidwys opened a new pull request #7177: [FLINK-7599] Support for aggregates 
in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177
 
 
   ## What is the purpose of the change
   
   This change allows using aggregates in MEASURES and DEFINE clauses of 
MATCH_RECOGNIZE in sql.
   
   ## Brief change log
- Split `AggregateUtil#transformToAggregateFunctions`
- generating code for aggregates in 
`org.apache.flink.table.codegen.MatchCodeGenerator.AggBuilder`
- extended verification of aggregates' operands in 
`org.apache.flink.table.codegen.PatternVariableFinder` that checks for operands 
in udags
   
Still missing:
- support for `open/close` methods (depends on #7110)
- do not support distinct aggregates (depends on 
https://issues.apache.org/jira/browse/CALCITE-2707) 
- updated documentation
   
   ## Verifying this change
   
   Added tests:
   * 
`org.apache.flink.table.match.MatchOperatorValidationTest#testAggregatesOnMultiplePatternVariablesNotSupported`
   * 
`org.apache.flink.table.runtime.stream.sql.MatchRecognizeITCase#testCepAggregates`
   * 
`org.apache.flink.table.`match`.MatchOperatorValidationTest#testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-19 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692723#comment-16692723
 ] 

Dian Fu commented on FLINK-7599:


Sure, go ahead.

> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-19 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691696#comment-16691696
 ] 

Dawid Wysakowicz commented on FLINK-7599:
-

Hi [~dian.fu] would you be ok, if I take over this issue?

> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)