[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899849#comment-15899849 ] ASF GitHub Bot commented on FLINK-5956: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3470 > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899336#comment-15899336 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3470 Thanks for the update @shaoxuan-wang. PR is good to merge :-) > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897278#comment-15897278 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104413472 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A a.f1 -= 1L --- End diff -- Good point > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897276#comment-15897276 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104411239 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala --- @@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalAvgAccumulator] + accum.f0 = accum.f0.subtract(v) + accum.f1 -= 1L if (accum.f1 == 0) { --- End diff -- This was intent to accommodate the "AggFunctionTestBase" for BigDecimal comparison (BigDecimal equals not only cares value but also the scale). I overlooked that we can use bigDecimal.compareTo to compare the values. Will remove this and change the test cases. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897277#comment-15897277 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104413666 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A } override def merge(accumulators: JList[Accumulator]): Accumulator = { -val ret = accumulators.get(0) +val ret = accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]] var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[MaxWithRetractAccumulator[T]] if (a.f1 != 0) { -accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0) +val iterator = a.f2.keySet().iterator() +while (iterator.hasNext()) { + val key = iterator.next() + //updating the resulting max value if needed + if (ord.compare(ret.f0, key) < 0) { --- End diff -- Yes, it is not necessary to compare the max for all hash keys. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897146#comment-15897146 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104389792 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala --- @@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalAvgAccumulator] + accum.f0 = accum.f0.subtract(v) + accum.f1 -= 1L if (accum.f1 == 0) { --- End diff -- Do we need this check? It only makes sense if we assume that accumulated and retracted values differ. But if that's the case all values for `cnt != 0` would be considered wrong. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897150#comment-15897150 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104376293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala --- @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { --- End diff -- I see, then let's keep it as it is > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897147#comment-15897147 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104392251 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala --- @@ -115,12 +112,28 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A } override def merge(accumulators: JList[Accumulator]): Accumulator = { -val ret = accumulators.get(0) +val ret = accumulators.get(0).asInstanceOf[MinWithRetractAccumulator[T]] var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[MinWithRetractAccumulator[T]] if (a.f1 != 0) { -accumulate(ret.asInstanceOf[MinWithRetractAccumulator[T]], a.f0) +val iterator = a.f2.keySet().iterator() +while (iterator.hasNext()) { + val key = iterator.next() + //updating the resulting max value if needed + if (ord.compare(ret.f0, key) > 0) { --- End diff -- As for MAX > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897148#comment-15897148 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104392189 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala --- @@ -72,37 +72,34 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A a.f1 -= 1L - if (!a.f2.containsKey(v)) { -throw TableException("unexpected retract message") - } else { -var count = a.f2.get(v) -count -= 1L -if (count == 0) { - //remove the key v from the map if the number of appearance of the value v is 0 - a.f2.remove(v) - //if the total count is 0, we could just simply set the f0(min) to the initial value - if (a.f1 == 0) { -a.f0 = getInitValue -return - } - //if v is the current min value, we have to iterate the map to find the 2nd smallest - // value to replace v as the min value - if (v == a.f0) { -val iterator = a.f2.keySet().iterator() -var key = iterator.next() -a.f0 = key -while (iterator.hasNext()) { - key = iterator.next() - if (ord.compare(a.f0, key) > 0) { -a.f0 = key - } + var count = a.f2.get(v) + count -= 1L + if (count == 0) { +//remove the key v from the map if the number of appearance of the value v is 0 +a.f2.remove(v) +//if the total count is 0, we could just simply set the f0(min) to the initial value +if (a.f1 == 0) { --- End diff -- remove `a.f1` from accumulator? > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897145#comment-15897145 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104394062 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** The initial accumulator for Sum with retract aggregate function */ +class SumWithRetractAccumulator[T] extends JTuple2[T, Long] with Accumulator + +/** + * Base class for built-in Sum with retract aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class SumWithRetractAggFunction[T: Numeric] extends AggregateFunction[T] { + + private val numeric = implicitly[Numeric[T]] + + override def createAccumulator(): Accumulator = { +val acc = new SumWithRetractAccumulator[T]() +acc.f0 = numeric.zero //sum +acc.f1 = 0L //total count +acc + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.plus(a.f0, v) + a.f1 += 1 +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.minus(a.f0, v) + a.f1 -= 1 +} + } + + override def getValue(accumulator: Accumulator): T = { +val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] +if (a.f1 > 0) { + a.f0 +} else { + null.asInstanceOf[T] +} + } + + override def merge(accumulators: JList[Accumulator]): Accumulator = { +val ret = createAccumulator().asInstanceOf[SumWithRetractAccumulator[T]] +var i: Int = 0 +while (i < accumulators.size()) { + val a = accumulators.get(i).asInstanceOf[SumWithRetractAccumulator[T]] + ret.f0 = numeric.plus(ret.f0, a.f0) + ret.f1 += a.f1 + i += 1 +} +ret + } + + override def getAccumulatorType(): TypeInformation[_] = { +new TupleTypeInfo( + (new SumWithRetractAccumulator).getClass, + getValueTypeInfo, + BasicTypeInfo.LONG_TYPE_INFO) + } + + def getValueTypeInfo: TypeInformation[_] +} + +/** + * Built-in Byte Sum with retract aggregate function + */ +class ByteSumWithRetractAggFunction extends SumWithRetractAggFunction[Byte] { + override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO +} + +/** + * Built-in Short Sum with retract aggregate function + */ +class ShortSumWithRetractAggFunction extends SumWithRetractAggFunction[Short] { + override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO +} + +/** + * Built-in Int Sum with retract aggregate function + */ +class IntSumWithRetractAggFunction extends SumWithRetractAggFunction[Int] { + override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO +} + +/** + * Built-in Long Sum with retract aggregate function
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897149#comment-15897149 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104392040 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A } override def merge(accumulators: JList[Accumulator]): Accumulator = { -val ret = accumulators.get(0) +val ret = accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]] var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[MaxWithRetractAccumulator[T]] if (a.f1 != 0) { -accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0) +val iterator = a.f2.keySet().iterator() +while (iterator.hasNext()) { + val key = iterator.next() + //updating the resulting max value if needed + if (ord.compare(ret.f0, key) < 0) { --- End diff -- I think we can simply compare the max values of both accumulators (no need to compare all in the hash set) and merge both hash sets: ``` // set max element if (ord.compare(ret.f0, key) < 0) { ret.f0 = a.f0; } // merge hash maps for (T key: a.f2.keySet()) { if (ret.f2.containsKey(key)) { ret.f2.put(key, ret.f2.get(key) + count) } else { ret.f2.put(key, a.f2.get(key)) } } ``` > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897144#comment-15897144 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104390901 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A a.f1 -= 1L --- End diff -- I think we don't need the `f1: Long` field (total count). We can also check `f2.size() == 0` > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896063#comment-15896063 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3470 @fhueske Thanks for the review. I have addressed all your comments and updated the PR > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896059#comment-15896059 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104304129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala --- @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { --- End diff -- If I directly create a LongAvgAggFunction, it will be difficult to return null for avg result, as null.asInstanceOf[Long] will return 0. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896058#comment-15896058 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104304057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala --- @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** +* Retract the input values from the accumulator instance. +* +* @param accumulator the accumulator which contains the current +*aggregated results +* @param input the input value (usually obtained from a new arrived data) --- End diff -- Ah, OK. but I think the above comments should be clear enough: "Retract the input values from the accumulator instance" I think we can leave it as this because it makes accumulate and retract interface consistent and clean. We will have (user defined) multiple inputs for both accumulate and retract in the near future, at which point we will anyway not control the naming of this. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896057#comment-15896057 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104303999 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala --- @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** +* Retract the input values from the accumulator instance. --- End diff -- I was thinking further for the dataStream retraction where the source table could send out of date retraction message. But let us add this assumption for now. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896056#comment-15896056 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104303912 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumAccumulator[T]] + a.f0 = numeric.plus(v, a.f0) --- End diff -- Added the test to compare two accumulators instead of just comparing two values will catch this. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896029#comment-15896029 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104302135 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala --- @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] { def aggregator: AggregateFunction[T] + def ifSupportRetraction: Boolean = true + @Test - // test aggregate functions without partial merge - def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { - val accumulator = aggregateVals(vals) - val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { +retractVals(accumulator, vals) --- End diff -- Regarding to "We also need to check the retraction of a single value not all values (the SumAggregator retraction of all values was correct because the count was 0)" Instead of adding a new test to just retraction one, I think we could just add the check to compare two accumulators (comparing the getValues is not sufficient enough anymore). assertEquals(accumulator, resultAccum) The accumulator should go back the initial status, after retracting all the previous inputs. This should be good enough to valid accumulate and retraction. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896000#comment-15896000 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301347 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala --- @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] { def aggregator: AggregateFunction[T] + def ifSupportRetraction: Boolean = true + @Test - // test aggregate functions without partial merge - def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { - val accumulator = aggregateVals(vals) - val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { +retractVals(accumulator, vals) --- End diff -- Sounds good to me. I prefer to apply merge with retraction, will add the tests. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895999#comment-15895999 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumAccumulator[T]] + a.f0 = numeric.plus(v, a.f0) + a.f1 -= 1 + if (a.f1 < 0) { --- End diff -- This exception usually won't happen if we use the retract for bounded over windows. With the on-going dataStream retraction design, the source table can be a table with PrimaryKey, which can generate retraction message. The downstream streaming job may receive the garbage retraction message, as the logging of the soureTable could contain the out of date retractions. The initial intent to throw the exception here is actually a mark for the future retraction design. Let me add a comment here (or may be just noted down myself) and remove the checks for now. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895997#comment-15895997 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301172 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -23,10 +23,11 @@ import java.util.{List => JList} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.TableException import org.apache.flink.table.functions.{Accumulator, AggregateFunction} /** The initial accumulator for Sum aggregate function */ -class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator +class SumAccumulator[T] extends JTuple2[T, Long] with Accumulator --- End diff -- I am Ok with having two Sum aggregates. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895996#comment-15895996 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301147 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util.{HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] { + + override def createAccumulator(): Accumulator = { +val acc = new MaxWithRetractAccumulator[T] +acc.f0 = getInitValue //max +acc.f1 = 0L //total count +acc.f2 = new JHashMap[T, Long]() //store the count for each value +acc + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]] + + if (a.f1 == 0 || (ord.compare(a.f0, v) < 0)) { +a.f0 = v + } + + a.f1 += 1L + + if (!a.f2.containsKey(v)) { +a.f2.put(v, 1L) + } else { +var count = a.f2.get(v) +count += 1L +a.f2.put(v, count) + } +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]] + + a.f1 -= 1L + + if (!a.f2.containsKey(v)) { +throw TableException("unexpected retract message") + } else { +var count = a.f2.get(v) +count -= 1L +if (count == 0) { + //remove the key v from the map if the number of appearance of the value v is 0 + a.f2.remove(v) + //if the total count is 0, we could just simply set the f0(max) to the initial value + if (a.f1 == 0) { +a.f0 = getInitValue +return + } + //if v is the current max value, we have to iterate the map to find the 2nd biggest + // value to replace v as the max value + if (v == a.f0) { +val iterator = a.f2.keySet().iterator() +var key = iterator.next() +a.f0 = key +while (iterator.hasNext()) { + key = iterator.next() + if (ord.compare(a.f0, key) < 0) { +a.f0 = key + } +} + } +} else { + a.f2.put(v, count) +} + } +} + } + + override def getValue(accumulator: Accumulator): T = { +val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]] +if (a.f1 != 0) { + a.f0 +} else { + null.asInstanceOf[T] +} + } + +
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895995#comment-15895995 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301102 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala --- @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** +* Retract the input values from the accumulator instance. +* +* @param accumulator the accumulator which contains the current +*aggregated results +* @param input the input value (usually obtained from a new arrived data) --- End diff -- I was referring to `input` not `accumulator` ;-) > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895993#comment-15895993 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301092 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util.{HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] { + + override def createAccumulator(): Accumulator = { +val acc = new MaxWithRetractAccumulator[T] +acc.f0 = getInitValue //max +acc.f1 = 0L //total count +acc.f2 = new JHashMap[T, Long]() //store the count for each value --- End diff -- yes, the idea is borrowed from bucket sort, I think this should be the right dataStructure for max, min, and even medium. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895987#comment-15895987 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301022 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala --- @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { --- End diff -- I assume you are suggesting implement LongAvgAggFunction directly with JTuple2[BigInteger, Long] as the accumulator, then yes, we could do that. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895986#comment-15895986 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104300989 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala --- @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** +* Retract the input values from the accumulator instance. +* +* @param accumulator the accumulator which contains the current +*aggregated results +* @param input the input value (usually obtained from a new arrived data) --- End diff -- I think it should be ok to use "accumulator" here. Retract is a method, it retracts the value from the accumulator. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895906#comment-15895906 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298517 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala --- @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] { def aggregator: AggregateFunction[T] + def ifSupportRetraction: Boolean = true + @Test - // test aggregate functions without partial merge - def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { - val accumulator = aggregateVals(vals) - val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { +retractVals(accumulator, vals) --- End diff -- We also need to check the retraction of a single value not all values (the `SumAggregator` retraction of all values was correct because the count was `0`). For that we can accumulate all values and retract one value and compare the result against the result of accumulating all values except the retracted one. Also check that retracting `null` does not have an effect. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895915#comment-15895915 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104297207 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala --- @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { --- End diff -- Can we implement `LongAvgAggFunction` without the indirection via `BigIntegralAvgAggFunction`? `BigIntegralAvgAggFunction` is no used anywhere else. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895914#comment-15895914 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298708 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util.{HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] { + + override def createAccumulator(): Accumulator = { +val acc = new MaxWithRetractAccumulator[T] +acc.f0 = getInitValue //max +acc.f1 = 0L //total count +acc.f2 = new JHashMap[T, Long]() //store the count for each value --- End diff -- I spent some time thinking whether a `HashMap` is the best data structure here because searching for the second largest element is quite expensive. But since many more values should be added O(1) and retracted without changing the max element (also O(1)), a `HashMap` should be a good choice. I noticed that the `MapSerializer` of the `MapTypeInfo` serializes and additional `Boolean` for value null checks. This is unnecessary overhead for us, but should be OK for now, IMO. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895921#comment-15895921 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298146 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -23,10 +23,11 @@ import java.util.{List => JList} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.TableException import org.apache.flink.table.functions.{Accumulator, AggregateFunction} /** The initial accumulator for Sum aggregate function */ -class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator +class SumAccumulator[T] extends JTuple2[T, Long] with Accumulator --- End diff -- I think we should also offer `SumAccumulator` with and without retraction. The difference between `Long` and `Boolean` is 7 bytes, which is a lot of additional overhead for most types. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895917#comment-15895917 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298418 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -556,7 +566,9 @@ object AggregateUtil { private def transformToAggregateFunctions( aggregateCalls: Seq[AggregateCall], inputType: RelDataType, - groupKeysCount: Int): (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = { + groupKeysCount: Int, + ifNeedRetraction: Boolean) --- End diff -- rename to `needsRetraction`? > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895916#comment-15895916 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104297801 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util.{HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] { + + override def createAccumulator(): Accumulator = { +val acc = new MaxWithRetractAccumulator[T] +acc.f0 = getInitValue //max +acc.f1 = 0L //total count +acc.f2 = new JHashMap[T, Long]() //store the count for each value +acc + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]] + + if (a.f1 == 0 || (ord.compare(a.f0, v) < 0)) { +a.f0 = v + } + + a.f1 += 1L + + if (!a.f2.containsKey(v)) { +a.f2.put(v, 1L) + } else { +var count = a.f2.get(v) +count += 1L +a.f2.put(v, count) + } +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]] + + a.f1 -= 1L + + if (!a.f2.containsKey(v)) { +throw TableException("unexpected retract message") + } else { +var count = a.f2.get(v) +count -= 1L +if (count == 0) { + //remove the key v from the map if the number of appearance of the value v is 0 + a.f2.remove(v) + //if the total count is 0, we could just simply set the f0(max) to the initial value + if (a.f1 == 0) { +a.f0 = getInitValue +return + } + //if v is the current max value, we have to iterate the map to find the 2nd biggest + // value to replace v as the max value + if (v == a.f0) { +val iterator = a.f2.keySet().iterator() +var key = iterator.next() +a.f0 = key +while (iterator.hasNext()) { + key = iterator.next() + if (ord.compare(a.f0, key) < 0) { +a.f0 = key + } +} + } +} else { + a.f2.put(v, count) +} + } +} + } + + override def getValue(accumulator: Accumulator): T = { +val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]] +if (a.f1 != 0) { + a.f0 +} else { + null.asInstanceOf[T] +} + } + + override
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895918#comment-15895918 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumAccumulator[T]] + a.f0 = numeric.plus(v, a.f0) --- End diff -- Please check the tests. This should have been caught. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895922#comment-15895922 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -148,12 +161,24 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] accum.f0 = accum.f0.add(v) - accum.f1 = true + accum.f1 += 1L +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalSumAccumulator] + accum.f0 = accum.f0.add(v) --- End diff -- subtract > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895912#comment-15895912 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298400 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -165,9 +190,9 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] { var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[DecimalSumAccumulator] - if (a.f1) { + if (a.f1 > 0) { --- End diff -- remove check? > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895920#comment-15895920 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298179 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumAccumulator[T]] + a.f0 = numeric.plus(v, a.f0) --- End diff -- minus > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895919#comment-15895919 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298390 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -148,12 +161,24 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] accum.f0 = accum.f0.add(v) - accum.f1 = true + accum.f1 += 1L +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalSumAccumulator] + accum.f0 = accum.f0.add(v) + accum.f1 -= 1L + if (accum.f1 < 0) { --- End diff -- remove? > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895910#comment-15895910 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298261 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 +} + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumAccumulator[T]] + a.f0 = numeric.plus(v, a.f0) + a.f1 -= 1 + if (a.f1 < 0) { --- End diff -- Do we want to check for these errors? It adds overhead and is the responsibility of the runtime to call the functions correctly. I'd rather add tests with custom aggregation functions to verify the behavior instead of adding overhead to the most widely used aggregation functions. We didn't add these checks to `CountAggregate` and `AverageAggregate`. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895908#comment-15895908 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104296481 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala --- @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** +* Retract the input values from the accumulator instance. +* +* @param accumulator the accumulator which contains the current +*aggregated results +* @param input the input value (usually obtained from a new arrived data) --- End diff -- rename parameter to `retraction`? > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895913#comment-15895913 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298605 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala --- @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** +* Retract the input values from the accumulator instance. --- End diff -- Do we want to mention that only previously `accumulated` values need to be `retracted`? Users should not check for that but can use this assumption to implement more efficient functions. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895907#comment-15895907 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298360 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -67,9 +80,9 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { var i: Int = 0 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]] - if (a.f1) { + if (a.f1 > 0) { --- End diff -- Do we need this check? If `a.f1` is `0`, `a.f0` should be `0` as well (if we only retract what was added before) and we can simply add sum and count. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895909#comment-15895909 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104297285 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala --- @@ -290,6 +317,19 @@ class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { +if (value != null) { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalAvgAccumulator] + if (accum.f1 == 0) { +accum.f0 = v --- End diff -- should be `accum.f0 = v.negate()`. I think we can even remove the `if (accum.f1 == 0)` condition and always subtract since `accum.f0` is initialized with `ZERO`. Same would apply for `accumulate`. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895911#comment-15895911 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298555 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala --- @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] { def aggregator: AggregateFunction[T] + def ifSupportRetraction: Boolean = true + @Test - // test aggregate functions without partial merge - def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { - val accumulator = aggregateVals(vals) - val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { +retractVals(accumulator, vals) --- End diff -- If we want that `accumulate` and `merge` can be used after `merge` was applied, we should add tests for that as well. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895620#comment-15895620 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3470 @fhueske sounds great. I updated the PR. Please take a look. I have add non-retractable and retractable aggregates for MIN and MAX. And add a flag parameter via AggregateUtil.transformToAggregateFunctions() to indicate if an aggregate needs to support retract or not. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895569#comment-15895569 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3470 Hi @shaoxuan-wang, yes. That's a very good point. We should definitely add non-retractable `MIN` and `MAX` aggregates. Would be good to have them now to use them in batch queries. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895566#comment-15895566 ] ASF GitHub Bot commented on FLINK-5956: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3470 Thanks for the PR @shaoxuan-wang. I haven't had a detailed look yet. I think we also need non-retract versions of `MIN` and `MAX`. The retracted version cannot be used to compute unbounded OVER windows and will be less efficient for batch queries (which do not need retraction). Thanks, Fabian > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895382#comment-15895382 ] ASF GitHub Bot commented on FLINK-5956: --- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3470 @fhueske, we should add different built-in aggregates for the ones that are not easy to be retracted (for instance Max and Min), one as withRetract(used for over window as # of items are limited, also for the dataStream when retraction is absolute needed) the other one as withoutRetract (used for dataSet and dataStream where retraction is not needed), it is the optimizer to check the rules and decide which aggregate should be used. I think we can go ahead as this (withRetract) for now, and complete the entire design together with dataStream retraction. > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5956) Add retract method into the aggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894749#comment-15894749 ] ASF GitHub Bot commented on FLINK-5956: --- GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3470 [FLINK-5956] [table] Add retract method for aggregateFunction This PR adds retraction method for AggregateFunction, it also implements retract methods as well as test cases for all built-in aggregates. Retraction method is help for processing update message. It will also very helpful for the grouping window and over window aggregation where we may want to retract the out of window data from the accumulator. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5956-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3470.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3470 commit 09a863bd70f4e8b693276e47942c1dff2fd5375e Author: shaoxuan-wangDate: 2017-03-03T17:37:50Z [FLINK-5956] [table] Add retract method for aggregateFunction > Add retract method into the aggregateFunction > - > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)