Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21611#discussion_r198664388
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
@@ -148,6 +148,79 @@ object VeryComplexResultAgg extends Aggregator[Row,
String, ComplexAggData] {
}
+case class OptionBooleanData(name: String, isGood: Option[Boolean])
+case class OptionBooleanIntData(name: String, isGood: Option[(Boolean,
Int)])
+
+case class OptionBooleanAggregator(colName: String)
+ extends Aggregator[Row, Option[Boolean], Option[Boolean]] {
+
+ override def zero: Option[Boolean] = None
+
+ override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean]
= {
+ val index = row.fieldIndex(colName)
+ val value = if (row.isNullAt(index)) {
+ Option.empty[Boolean]
+ } else {
+ Some(row.getBoolean(index))
+ }
+ merge(buffer, value)
+ }
+
+ override def merge(b1: Option[Boolean], b2: Option[Boolean]):
Option[Boolean] = {
+ if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
+ Some(true)
+ } else if (b1.isDefined) {
+ b1
+ } else {
+ b2
+ }
+ }
+
+ override def finish(reduction: Option[Boolean]): Option[Boolean] =
reduction
+
+ override def bufferEncoder: Encoder[Option[Boolean]] =
OptionalBoolEncoder
+ override def outputEncoder: Encoder[Option[Boolean]] =
OptionalBoolEncoder
+
+ def OptionalBoolEncoder: Encoder[Option[Boolean]] = ExpressionEncoder()
+}
+
+case class OptionBooleanIntAggregator(colName: String)
+ extends Aggregator[Row, Option[(Boolean, Int)], Option[(Boolean,
Int)]] {
+
+ override def zero: Option[(Boolean, Int)] = None
+
+ override def reduce(buffer: Option[(Boolean, Int)], row: Row):
Option[(Boolean, Int)] = {
+ val index = row.fieldIndex(colName)
+ val value = if (row.isNullAt(index)) {
+ Option.empty[(Boolean, Int)]
+ } else {
+ val nestedRow = row.getStruct(index)
+ Some((nestedRow.getBoolean(0), nestedRow.getInt(1)))
+ }
+ merge(buffer, value)
+ }
+
+ override def merge(
+ b1: Option[(Boolean, Int)],
+ b2: Option[(Boolean, Int)]): Option[(Boolean, Int)] = {
+ if ((b1.isDefined && b1.get._1) || (b2.isDefined && b2.get._1)) {
+ val newInt = b1.map(_._2).getOrElse(0) + b2.map(_._2).getOrElse(0)
+ Some((true, newInt))
+ } else if (b1.isDefined) {
+ b1
+ } else {
+ b2
+ }
+ }
+
+ override def finish(reduction: Option[(Boolean, Int)]): Option[(Boolean,
Int)] = reduction
+
+ override def bufferEncoder: Encoder[Option[(Boolean, Int)]] =
OptionalBoolIntEncoder
+ override def outputEncoder: Encoder[Option[(Boolean, Int)]] =
OptionalBoolIntEncoder
+
+ def OptionalBoolIntEncoder: Encoder[Option[(Boolean, Int)]] =
ExpressionEncoder(topLevel = false)
--- End diff --
If we want to create the encoders in `Encoders`, maybe we should do it in
another PR?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]