Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/18113#discussion_r150388551
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
---
@@ -76,26 +77,130 @@ class TypedCount[IN](val f: IN => Any) extends
Aggregator[IN, Long, Long] {
// Java api support
def this(f: MapFunction[IN, Object]) = this((x: IN) =>
f.call(x).asInstanceOf[Any])
+
def toColumnJava: TypedColumn[IN, java.lang.Long] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
}
}
+class TypedAverage[IN](val f: IN => Double)
+ extends Aggregator[IN, (Double, Long), Double] {
-class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN,
(Double, Long), Double] {
override def zero: (Double, Long) = (0.0, 0L)
override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) +
b._1, 1 + b._2)
- override def finish(reduction: (Double, Long)): Double = reduction._1 /
reduction._2
- override def merge(b1: (Double, Long), b2: (Double, Long)): (Double,
Long) = {
+ override def merge(b1: (Double, Long), b2: (Double, Long)): (Double,
Long) =
(b1._1 + b2._1, b1._2 + b2._2)
- }
+ override def finish(reduction: (Double, Long)): Double = reduction._1 /
reduction._2
override def bufferEncoder: Encoder[(Double, Long)] =
ExpressionEncoder[(Double, Long)]()
override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
// Java api support
def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) =>
f.call(x).asInstanceOf[Double])
+
def toColumnJava: TypedColumn[IN, java.lang.Double] = {
toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
}
}
+
+class TypedMinDouble[IN](val f: IN => Double)
+ extends Aggregator[IN, Double, Double] {
+
+ override def zero: Double = Double.MaxValue
+ override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
+ override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
+ override def finish(reduction: Double): Double = {
+ if (Double.MaxValue == reduction) {
+ Double.NegativeInfinity
+ }
+ else {
+ reduction
+ }
+ }
+
+ override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
+ override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
+
+ // Java api support
+ def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) =>
f.call(x).asInstanceOf[Double])
+
+ def toColumnJava: TypedColumn[IN, java.lang.Double] = {
+ toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
+ }
+}
+
+class TypedMaxDouble[IN](val f: IN => Double)
+ extends Aggregator[IN, Double, Double] {
+
+ override def zero: Double = Double.MinValue
--- End diff --
can we use `Double.PositiveInfinity` as initial value?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]