Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18113#discussion_r150391130 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala --- @@ -76,26 +76,126 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] { // Java api support def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any]) + def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] } } +class TypedAverage[IN](val f: IN => Double) + extends Aggregator[IN, (Double, Long), Double] { -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] { override def zero: (Double, Long) = (0.0, 0L) override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2) - override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2 - override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = { + override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = (b1._1 + b2._1, b1._2 + b2._2) - } + override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2 override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]() override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() // Java api support def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) + + def toColumnJava: TypedColumn[IN, java.lang.Double] = { + toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] + } +} + +class TypedMinDouble[IN](val f: IN => Double) + extends Aggregator[IN, Double, Double] { + + override def zero: Double = Double.PositiveInfinity + override def reduce(b: Double, a: IN): Double = math.min(b, f(a)) + override def merge(b1: Double, b2: Double): Double = math.min(b1, b2) + override def finish(reduction: Double): Double = { + if (Double.PositiveInfinity == reduction) { + Double.NegativeInfinity + } else { + reduction + } + } + + override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]() + override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() + + // Java api support + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) + def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] } } + +class TypedMaxDouble[IN](val f: IN => Double) + extends Aggregator[IN, Double, Double] { --- End diff -- one line please
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org