[GitHub] sunjincheng121 commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView

2018-12-06 Thread GitBox
sunjincheng121 commented on a change in pull request #7201: [FLINK-7208] 
[table] Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r239663677
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1437,36 +1437,30 @@ object AggregateUtil {
 
   case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
 aggregates(index) = new 
CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType))
-accTypes(index) = aggregates(index).getAccumulatorType
 
   case udagg: AggSqlFunction =>
 aggregates(index) = udagg.getFunction
-accTypes(index) = udagg.accType
 
   case unSupported: SqlAggFunction =>
 throw new TableException(s"Unsupported Function: 
'${unSupported.getName}'")
 }
   }
+  accTypes(index) = 
getAccumulatorTypeOfAggregateFunction(aggregates(index))
 }
 
 val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
 
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
 
 Review comment:
   +1 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView

2018-12-03 Thread GitBox
sunjincheng121 commented on a change in pull request #7201: [FLINK-7208] 
[table] Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r238504541
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
 ##
 @@ -18,19 +18,20 @@
 package org.apache.flink.table.functions.aggfunctions
 
 import java.math.BigDecimal
-import java.util.{HashMap => JHashMap}
-import java.lang.{Iterable => JIterable}
+import java.lang.{Iterable => JIterable, Long => JLong}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
-import org.apache.flink.table.api.Types
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, 
Types}
+import org.apache.flink.table.api.dataview.MapView
 import org.apache.flink.table.functions.aggfunctions.Ordering._
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Min with retraction aggregate function */
-class MinWithRetractAccumulator[T] extends JTuple2[T, JHashMap[T, Long]]
+class MinWithRetractAccumulator[T] {
 
 Review comment:
   I think we should override the `getAccumulatorType` for both 
`MinWithRetractAggFunction` and `MaxWithRetractAggFunction ` otherwise we will 
always use the memory.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services