[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3470


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104413666
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
   }
 
   override def merge(accumulators: JList[Accumulator]): Accumulator = {
-val ret = accumulators.get(0)
+val ret = 
accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]]
 var i: Int = 1
 while (i < accumulators.size()) {
   val a = 
accumulators.get(i).asInstanceOf[MaxWithRetractAccumulator[T]]
   if (a.f1 != 0) {
-accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0)
+val iterator = a.f2.keySet().iterator()
+while (iterator.hasNext()) {
+  val key = iterator.next()
+  //updating the resulting max value if needed
+  if (ord.compare(ret.f0, key) < 0) {
--- End diff --

Yes, it is not necessary to compare the max for all hash keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104411239
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
 ---
@@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends 
AggregateFunction[BigDecimal] {
 if (value != null) {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
+  accum.f0 = accum.f0.subtract(v)
+  accum.f1 -= 1L
   if (accum.f1 == 0) {
--- End diff --

This was intent to accommodate the "AggFunctionTestBase" for BigDecimal 
comparison (BigDecimal equals not only cares value but also the scale).  I 
overlooked that we can use bigDecimal.compareTo to compare the values. Will 
remove this and change the test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104413472
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
 
   a.f1 -= 1L
--- End diff --

Good point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104394062
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala
 ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Sum with retract aggregate function */
+class SumWithRetractAccumulator[T] extends JTuple2[T, Long] with 
Accumulator
+
+/**
+  * Base class for built-in Sum with retract aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class SumWithRetractAggFunction[T: Numeric] extends 
AggregateFunction[T] {
+
+  private val numeric = implicitly[Numeric[T]]
+
+  override def createAccumulator(): Accumulator = {
+val acc = new SumWithRetractAccumulator[T]()
+acc.f0 = numeric.zero //sum
+acc.f1 = 0L //total count
+acc
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
+  a.f0 = numeric.plus(a.f0, v)
+  a.f1 += 1
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
+  a.f0 = numeric.minus(a.f0, v)
+  a.f1 -= 1
+}
+  }
+
+  override def getValue(accumulator: Accumulator): T = {
+val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
+if (a.f1 > 0) {
+  a.f0
+} else {
+  null.asInstanceOf[T]
+}
+  }
+
+  override def merge(accumulators: JList[Accumulator]): Accumulator = {
+val ret = 
createAccumulator().asInstanceOf[SumWithRetractAccumulator[T]]
+var i: Int = 0
+while (i < accumulators.size()) {
+  val a = 
accumulators.get(i).asInstanceOf[SumWithRetractAccumulator[T]]
+  ret.f0 = numeric.plus(ret.f0, a.f0)
+  ret.f1 += a.f1
+  i += 1
+}
+ret
+  }
+
+  override def getAccumulatorType(): TypeInformation[_] = {
+new TupleTypeInfo(
+  (new SumWithRetractAccumulator).getClass,
+  getValueTypeInfo,
+  BasicTypeInfo.LONG_TYPE_INFO)
+  }
+
+  def getValueTypeInfo: TypeInformation[_]
+}
+
+/**
+  * Built-in Byte Sum with retract aggregate function
+  */
+class ByteSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Byte] {
+  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
+}
+
+/**
+  * Built-in Short Sum with retract aggregate function
+  */
+class ShortSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Short] {
+  override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
+}
+
+/**
+  * Built-in Int Sum with retract aggregate function
+  */
+class IntSumWithRetractAggFunction extends SumWithRetractAggFunction[Int] {
+  override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
+}
+
+/**
+  * Built-in Long Sum with retract aggregate function
+  */
+class LongSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Long] {
+  override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+/**
+  * Built-in Float Sum with retract aggregate 

[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104389792
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
 ---
@@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends 
AggregateFunction[BigDecimal] {
 if (value != null) {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
+  accum.f0 = accum.f0.subtract(v)
+  accum.f1 -= 1L
   if (accum.f1 == 0) {
--- End diff --

Do we need this check?
It only makes sense if we assume that accumulated and retracted values 
differ. But if that's the case all values for `cnt != 0` would be considered 
wrong. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104392251
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
 ---
@@ -115,12 +112,28 @@ abstract class MinWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
   }
 
   override def merge(accumulators: JList[Accumulator]): Accumulator = {
-val ret = accumulators.get(0)
+val ret = 
accumulators.get(0).asInstanceOf[MinWithRetractAccumulator[T]]
 var i: Int = 1
 while (i < accumulators.size()) {
   val a = 
accumulators.get(i).asInstanceOf[MinWithRetractAccumulator[T]]
   if (a.f1 != 0) {
-accumulate(ret.asInstanceOf[MinWithRetractAccumulator[T]], a.f0)
+val iterator = a.f2.keySet().iterator()
+while (iterator.hasNext()) {
+  val key = iterator.next()
+  //updating the resulting max value if needed
+  if (ord.compare(ret.f0, key) > 0) {
--- End diff --

As for MAX


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104392040
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
   }
 
   override def merge(accumulators: JList[Accumulator]): Accumulator = {
-val ret = accumulators.get(0)
+val ret = 
accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]]
 var i: Int = 1
 while (i < accumulators.size()) {
   val a = 
accumulators.get(i).asInstanceOf[MaxWithRetractAccumulator[T]]
   if (a.f1 != 0) {
-accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0)
+val iterator = a.f2.keySet().iterator()
+while (iterator.hasNext()) {
+  val key = iterator.next()
+  //updating the resulting max value if needed
+  if (ord.compare(ret.f0, key) < 0) {
--- End diff --

I think we can simply compare the max values of both accumulators (no need 
to compare all in the hash set) and merge both hash sets:
```
// set max element
if (ord.compare(ret.f0, key) < 0) {
  ret.f0 = a.f0;
}
// merge hash maps
for (T key: a.f2.keySet()) {
  if (ret.f2.containsKey(key)) {
ret.f2.put(key, ret.f2.get(key) + count)
  } else {
ret.f2.put(key, a.f2.get(key))
  }
}
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104392189
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
 ---
@@ -72,37 +72,34 @@ abstract class MinWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
 
   a.f1 -= 1L
 
-  if (!a.f2.containsKey(v)) {
-throw TableException("unexpected retract message")
-  } else {
-var count = a.f2.get(v)
-count -= 1L
-if (count == 0) {
-  //remove the key v from the map if the number of appearance of 
the value v is 0
-  a.f2.remove(v)
-  //if the total count is 0, we could just simply set the f0(min) 
to the initial value
-  if (a.f1 == 0) {
-a.f0 = getInitValue
-return
-  }
-  //if v is the current min value, we have to iterate the map to 
find the 2nd smallest
-  // value to replace v as the min value
-  if (v == a.f0) {
-val iterator = a.f2.keySet().iterator()
-var key = iterator.next()
-a.f0 = key
-while (iterator.hasNext()) {
-  key = iterator.next()
-  if (ord.compare(a.f0, key) > 0) {
-a.f0 = key
-  }
+  var count = a.f2.get(v)
+  count -= 1L
+  if (count == 0) {
+//remove the key v from the map if the number of appearance of the 
value v is 0
+a.f2.remove(v)
+//if the total count is 0, we could just simply set the f0(min) to 
the initial value
+if (a.f1 == 0) {
--- End diff --

remove `a.f1` from accumulator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104390901
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
 
   a.f1 -= 1L
--- End diff --

I think we don't need the `f1: Long` field (total count). We can also check 
`f2.size() == 0`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104376293
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
 ---
@@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends 
AggregateFunction[T] {
 }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
--- End diff --

I see, then let's keep it as it is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104304129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
 ---
@@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends 
AggregateFunction[T] {
 }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
--- End diff --

If I directly create a LongAvgAggFunction, it will be difficult to return 
null for avg result, as null.asInstanceOf[Long] will return 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104304057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ---
@@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def createAccumulator(): Accumulator
 
   /**
+* Retract the input values from the accumulator instance.
+*
+* @param accumulator the accumulator which contains the current
+*aggregated results
+* @param input   the input value (usually obtained from a new 
arrived data)
--- End diff --

Ah, OK. but I think the above comments should be clear enough: "Retract the 
input values from the accumulator instance" I think we can leave it as this 
because it makes accumulate and retract interface consistent and clean. We will 
have (user defined) multiple inputs for both accumulate and retract in the near 
future, at which point we will anyway not control the naming of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104303999
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ---
@@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def createAccumulator(): Accumulator
 
   /**
+* Retract the input values from the accumulator instance.
--- End diff --

 I was thinking further for the dataStream retraction where the source 
table could send out of date retraction message. But let us add this assumption 
for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104303912
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
   val v = value.asInstanceOf[T]
   val a = accumulator.asInstanceOf[SumAccumulator[T]]
   a.f0 = numeric.plus(v, a.f0)
-  a.f1 = true
+  a.f1 += 1
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[SumAccumulator[T]]
+  a.f0 = numeric.plus(v, a.f0)
--- End diff --

Added the test to compare two accumulators instead of just comparing two 
values will catch this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104302135
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
 ---
@@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {
 
   def aggregator: AggregateFunction[T]
 
+  def ifSupportRetraction: Boolean = true
+
   @Test
-  // test aggregate functions without partial merge
-  def testAggregateWithoutMerge(): Unit = {
+  // test aggregate and retract functions without partial merge
+  def testAccumulateAndRetractWithoutMerge(): Unit = {
 // iterate over input sets
 for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-  val accumulator = aggregateVals(vals)
-  val result = aggregator.getValue(accumulator)
+  val accumulator = accumulateVals(vals)
+  var result = aggregator.getValue(accumulator)
   validateResult(expected, result)
+
+  if (ifSupportRetraction) {
+retractVals(accumulator, vals)
--- End diff --

Regarding to "We also need to check the retraction of a single value not 
all values (the SumAggregator retraction of all values was correct because the 
count was 0)"
Instead of adding a new test to just retraction one, I think we could just 
add the check to compare two accumulators (comparing the getValues is not 
sufficient enough anymore). 
assertEquals(accumulator, resultAccum)
The accumulator should go back the initial status, after retracting all the 
previous inputs. This should be good enough to valid accumulate and retraction. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104301347
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
 ---
@@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {
 
   def aggregator: AggregateFunction[T]
 
+  def ifSupportRetraction: Boolean = true
+
   @Test
-  // test aggregate functions without partial merge
-  def testAggregateWithoutMerge(): Unit = {
+  // test aggregate and retract functions without partial merge
+  def testAccumulateAndRetractWithoutMerge(): Unit = {
 // iterate over input sets
 for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-  val accumulator = aggregateVals(vals)
-  val result = aggregator.getValue(accumulator)
+  val accumulator = accumulateVals(vals)
+  var result = aggregator.getValue(accumulator)
   validateResult(expected, result)
+
+  if (ifSupportRetraction) {
+retractVals(accumulator, vals)
--- End diff --

Sounds good to me. I prefer to apply merge with retraction, will add the 
tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104301312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
   val v = value.asInstanceOf[T]
   val a = accumulator.asInstanceOf[SumAccumulator[T]]
   a.f0 = numeric.plus(v, a.f0)
-  a.f1 = true
+  a.f1 += 1
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[SumAccumulator[T]]
+  a.f0 = numeric.plus(v, a.f0)
+  a.f1 -= 1
+  if (a.f1 < 0) {
--- End diff --

This exception usually won't happen if we use the retract for bounded over 
windows. With the on-going dataStream retraction design, the source table can 
be a table with PrimaryKey, which can generate retraction message. The 
downstream streaming job may receive the garbage retraction message, as the 
logging of the soureTable could contain the out of date retractions. The 
initial intent to throw the exception here is actually a mark for the future 
retraction design. Let me add a comment here (or may be just noted down myself) 
and remove the checks for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104301172
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -23,10 +23,11 @@ import java.util.{List => JList}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
 
 /** The initial accumulator for Sum aggregate function */
-class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
+class SumAccumulator[T] extends JTuple2[T, Long] with Accumulator
--- End diff --

I am Ok with having two Sum aggregates. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104301147
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Max with retraction aggregate function */
+class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, 
Long]] with Accumulator
+
+/**
+  * Base class for built-in Max with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) 
extends AggregateFunction[T] {
+
+  override def createAccumulator(): Accumulator = {
+val acc = new MaxWithRetractAccumulator[T]
+acc.f0 = getInitValue //max
+acc.f1 = 0L //total count
+acc.f2 = new JHashMap[T, Long]() //store the count for each value
+acc
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+
+  if (a.f1 == 0 || (ord.compare(a.f0, v) < 0)) {
+a.f0 = v
+  }
+
+  a.f1 += 1L
+
+  if (!a.f2.containsKey(v)) {
+a.f2.put(v, 1L)
+  } else {
+var count = a.f2.get(v)
+count += 1L
+a.f2.put(v, count)
+  }
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+
+  a.f1 -= 1L
+
+  if (!a.f2.containsKey(v)) {
+throw TableException("unexpected retract message")
+  } else {
+var count = a.f2.get(v)
+count -= 1L
+if (count == 0) {
+  //remove the key v from the map if the number of appearance of 
the value v is 0
+  a.f2.remove(v)
+  //if the total count is 0, we could just simply set the f0(max) 
to the initial value
+  if (a.f1 == 0) {
+a.f0 = getInitValue
+return
+  }
+  //if v is the current max value, we have to iterate the map to 
find the 2nd biggest
+  // value to replace v as the max value
+  if (v == a.f0) {
+val iterator = a.f2.keySet().iterator()
+var key = iterator.next()
+a.f0 = key
+while (iterator.hasNext()) {
+  key = iterator.next()
+  if (ord.compare(a.f0, key) < 0) {
+a.f0 = key
+  }
+}
+  }
+} else {
+  a.f2.put(v, count)
+}
+  }
+}
+  }
+
+  override def getValue(accumulator: Accumulator): T = {
+val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+if (a.f1 != 0) {
+  a.f0
+} else {
+  null.asInstanceOf[T]
+}
+  }
+
+  override def merge(accumulators: JList[Accumulator]): Accumulator = {
+val ret = accumulators.get(0)
+var i: Int = 1
+while (i < accumulators.size()) {
+  val a = 

[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104301102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ---
@@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def createAccumulator(): Accumulator
 
   /**
+* Retract the input values from the accumulator instance.
+*
+* @param accumulator the accumulator which contains the current
+*aggregated results
+* @param input   the input value (usually obtained from a new 
arrived data)
--- End diff --

I was referring to `input` not `accumulator` ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104301092
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Max with retraction aggregate function */
+class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, 
Long]] with Accumulator
+
+/**
+  * Base class for built-in Max with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) 
extends AggregateFunction[T] {
+
+  override def createAccumulator(): Accumulator = {
+val acc = new MaxWithRetractAccumulator[T]
+acc.f0 = getInitValue //max
+acc.f1 = 0L //total count
+acc.f2 = new JHashMap[T, Long]() //store the count for each value
--- End diff --

yes, the idea is borrowed from bucket sort, I think this should be the 
right dataStructure for max, min, and even medium.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104301022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
 ---
@@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends 
AggregateFunction[T] {
 }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
--- End diff --

I assume you are suggesting implement LongAvgAggFunction directly with 
JTuple2[BigInteger, Long] as the accumulator, then yes, we could do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104300989
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ---
@@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def createAccumulator(): Accumulator
 
   /**
+* Retract the input values from the accumulator instance.
+*
+* @param accumulator the accumulator which contains the current
+*aggregated results
+* @param input   the input value (usually obtained from a new 
arrived data)
--- End diff --

I think it should be ok to use "accumulator" here. Retract is a method, it 
retracts the value from the accumulator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298400
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -165,9 +190,9 @@ class DecimalSumAggFunction extends 
AggregateFunction[BigDecimal] {
 var i: Int = 1
 while (i < accumulators.size()) {
   val a = accumulators.get(i).asInstanceOf[DecimalSumAccumulator]
-  if (a.f1) {
+  if (a.f1 > 0) {
--- End diff --

remove check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -148,12 +161,24 @@ class DecimalSumAggFunction extends 
AggregateFunction[BigDecimal] {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
   accum.f0 = accum.f0.add(v)
-  accum.f1 = true
+  accum.f1 += 1L
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[BigDecimal]
+  val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
+  accum.f0 = accum.f0.add(v)
+  accum.f1 -= 1L
+  if (accum.f1 < 0) {
--- End diff --

remove?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298146
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -23,10 +23,11 @@ import java.util.{List => JList}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
 
 /** The initial accumulator for Sum aggregate function */
-class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
+class SumAccumulator[T] extends JTuple2[T, Long] with Accumulator
--- End diff --

I think we should also offer `SumAccumulator` with and without retraction. 
The difference between `Long` and `Boolean` is 7 bytes, which is a lot of 
additional overhead for most types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
   val v = value.asInstanceOf[T]
   val a = accumulator.asInstanceOf[SumAccumulator[T]]
   a.f0 = numeric.plus(v, a.f0)
-  a.f1 = true
+  a.f1 += 1
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[SumAccumulator[T]]
+  a.f0 = numeric.plus(v, a.f0)
--- End diff --

Please check the tests. This should have been caught.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298179
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
   val v = value.asInstanceOf[T]
   val a = accumulator.asInstanceOf[SumAccumulator[T]]
   a.f0 = numeric.plus(v, a.f0)
-  a.f1 = true
+  a.f1 += 1
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[SumAccumulator[T]]
+  a.f0 = numeric.plus(v, a.f0)
--- End diff --

minus


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104297801
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Max with retraction aggregate function */
+class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, 
Long]] with Accumulator
+
+/**
+  * Base class for built-in Max with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) 
extends AggregateFunction[T] {
+
+  override def createAccumulator(): Accumulator = {
+val acc = new MaxWithRetractAccumulator[T]
+acc.f0 = getInitValue //max
+acc.f1 = 0L //total count
+acc.f2 = new JHashMap[T, Long]() //store the count for each value
+acc
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+
+  if (a.f1 == 0 || (ord.compare(a.f0, v) < 0)) {
+a.f0 = v
+  }
+
+  a.f1 += 1L
+
+  if (!a.f2.containsKey(v)) {
+a.f2.put(v, 1L)
+  } else {
+var count = a.f2.get(v)
+count += 1L
+a.f2.put(v, count)
+  }
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+
+  a.f1 -= 1L
+
+  if (!a.f2.containsKey(v)) {
+throw TableException("unexpected retract message")
+  } else {
+var count = a.f2.get(v)
+count -= 1L
+if (count == 0) {
+  //remove the key v from the map if the number of appearance of 
the value v is 0
+  a.f2.remove(v)
+  //if the total count is 0, we could just simply set the f0(max) 
to the initial value
+  if (a.f1 == 0) {
+a.f0 = getInitValue
+return
+  }
+  //if v is the current max value, we have to iterate the map to 
find the 2nd biggest
+  // value to replace v as the max value
+  if (v == a.f0) {
+val iterator = a.f2.keySet().iterator()
+var key = iterator.next()
+a.f0 = key
+while (iterator.hasNext()) {
+  key = iterator.next()
+  if (ord.compare(a.f0, key) < 0) {
+a.f0 = key
+  }
+}
+  }
+} else {
+  a.f2.put(v, count)
+}
+  }
+}
+  }
+
+  override def getValue(accumulator: Accumulator): T = {
+val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+if (a.f1 != 0) {
+  a.f0
+} else {
+  null.asInstanceOf[T]
+}
+  }
+
+  override def merge(accumulators: JList[Accumulator]): Accumulator = {
+val ret = accumulators.get(0)
+var i: Int = 1
+while (i < accumulators.size()) {
+  val a = 

[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104296481
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ---
@@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def createAccumulator(): Accumulator
 
   /**
+* Retract the input values from the accumulator instance.
+*
+* @param accumulator the accumulator which contains the current
+*aggregated results
+* @param input   the input value (usually obtained from a new 
arrived data)
--- End diff --

rename parameter to `retraction`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298418
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -556,7 +566,9 @@ object AggregateUtil {
   private def transformToAggregateFunctions(
   aggregateCalls: Seq[AggregateCall],
   inputType: RelDataType,
-  groupKeysCount: Int): (Array[Int], Array[TableAggregateFunction[_ <: 
Any]]) = {
+  groupKeysCount: Int,
+  ifNeedRetraction: Boolean)
--- End diff --

rename to `needsRetraction`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -148,12 +161,24 @@ class DecimalSumAggFunction extends 
AggregateFunction[BigDecimal] {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
   accum.f0 = accum.f0.add(v)
-  accum.f1 = true
+  accum.f1 += 1L
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[BigDecimal]
+  val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
+  accum.f0 = accum.f0.add(v)
--- End diff --

subtract


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104297207
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
 ---
@@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends 
AggregateFunction[T] {
 }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
--- End diff --

Can we implement `LongAvgAggFunction` without the indirection via 
`BigIntegralAvgAggFunction`? `BigIntegralAvgAggFunction` is no used anywhere 
else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Max with retraction aggregate function */
+class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, 
Long]] with Accumulator
+
+/**
+  * Base class for built-in Max with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) 
extends AggregateFunction[T] {
+
+  override def createAccumulator(): Accumulator = {
+val acc = new MaxWithRetractAccumulator[T]
+acc.f0 = getInitValue //max
+acc.f1 = 0L //total count
+acc.f2 = new JHashMap[T, Long]() //store the count for each value
--- End diff --

I spent some time thinking whether a `HashMap` is the best data structure 
here because searching for the second largest element is quite expensive. But 
since many more values should be added O(1) and retracted without changing the 
max element (also O(1)), a `HashMap` should be a good choice. I noticed that 
the `MapSerializer` of the `MapTypeInfo` serializes and additional `Boolean` 
for value null checks. This is unnecessary overhead for us, but should be OK 
for now, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298360
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -67,9 +80,9 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
 var i: Int = 0
 while (i < accumulators.size()) {
   val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]]
-  if (a.f1) {
+  if (a.f1 > 0) {
--- End diff --

Do we need this check? If `a.f1` is `0`, `a.f0` should be `0` as well (if 
we only retract what was added before) and we can simply add sum and count.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298555
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
 ---
@@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {
 
   def aggregator: AggregateFunction[T]
 
+  def ifSupportRetraction: Boolean = true
+
   @Test
-  // test aggregate functions without partial merge
-  def testAggregateWithoutMerge(): Unit = {
+  // test aggregate and retract functions without partial merge
+  def testAccumulateAndRetractWithoutMerge(): Unit = {
 // iterate over input sets
 for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-  val accumulator = aggregateVals(vals)
-  val result = aggregator.getValue(accumulator)
+  val accumulator = accumulateVals(vals)
+  var result = aggregator.getValue(accumulator)
   validateResult(expected, result)
+
+  if (ifSupportRetraction) {
+retractVals(accumulator, vals)
--- End diff --

If we want that `accumulate` and `merge` can be used after `merge` was 
applied, we should add tests for that as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104297285
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
 ---
@@ -290,6 +317,19 @@ class DecimalAvgAggFunction extends 
AggregateFunction[BigDecimal] {
 }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[BigDecimal]
+  val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
+  if (accum.f1 == 0) {
+accum.f0 = v
--- End diff --

should be `accum.f0 = v.negate()`.

I think we can even remove the `if (accum.f1 == 0)` condition and always 
subtract since `accum.f0` is initialized with `ZERO`. Same would apply for 
`accumulate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298605
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ---
@@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def createAccumulator(): Accumulator
 
   /**
+* Retract the input values from the accumulator instance.
--- End diff --

Do we want to mention that only previously `accumulated` values need to be 
`retracted`?
Users should not check for that but can use this assumption to implement 
more efficient functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298517
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
 ---
@@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {
 
   def aggregator: AggregateFunction[T]
 
+  def ifSupportRetraction: Boolean = true
+
   @Test
-  // test aggregate functions without partial merge
-  def testAggregateWithoutMerge(): Unit = {
+  // test aggregate and retract functions without partial merge
+  def testAccumulateAndRetractWithoutMerge(): Unit = {
 // iterate over input sets
 for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-  val accumulator = aggregateVals(vals)
-  val result = aggregator.getValue(accumulator)
+  val accumulator = accumulateVals(vals)
+  var result = aggregator.getValue(accumulator)
   validateResult(expected, result)
+
+  if (ifSupportRetraction) {
+retractVals(accumulator, vals)
--- End diff --

We also need to check the retraction of a single value not all values (the 
`SumAggregator` retraction of all values was correct because the count was `0`).
For that we can accumulate all values and retract one value and compare the 
result against the result of accumulating all values except the retracted one.

Also check that retracting `null` does not have an effect.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3470#discussion_r104298261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
   val v = value.asInstanceOf[T]
   val a = accumulator.asInstanceOf[SumAccumulator[T]]
   a.f0 = numeric.plus(v, a.f0)
-  a.f1 = true
+  a.f1 += 1
+}
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[T]
+  val a = accumulator.asInstanceOf[SumAccumulator[T]]
+  a.f0 = numeric.plus(v, a.f0)
+  a.f1 -= 1
+  if (a.f1 < 0) {
--- End diff --

Do we want to check for these errors? It adds overhead and is the 
responsibility of the runtime to call the functions correctly. I'd rather add 
tests with custom aggregation functions to verify the behavior instead of 
adding overhead to the most widely used aggregation functions. We didn't add 
these checks to `CountAggregate` and `AverageAggregate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3470: [FLINK-5956] [table] Add retract method for aggreg...

2017-03-03 Thread shaoxuan-wang
GitHub user shaoxuan-wang opened a pull request:

https://github.com/apache/flink/pull/3470

[FLINK-5956] [table] Add retract method for aggregateFunction

This PR adds retraction method for AggregateFunction, it also implements 
retract methods as well as test cases for all built-in aggregates.

Retraction method is help for processing update message. It will also very 
helpful for the grouping window and over window aggregation where we may want 
to retract the out of window data from the accumulator. 


Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shaoxuan-wang/flink F5956-submit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3470.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3470


commit 09a863bd70f4e8b693276e47942c1dff2fd5375e
Author: shaoxuan-wang 
Date:   2017-03-03T17:37:50Z

[FLINK-5956] [table] Add retract method for aggregateFunction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---