Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21611#discussion_r197348976
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
@@ -148,6 +148,79 @@ object VeryComplexResultAgg extends Aggregator[Row,
String, ComplexAggData] {
}
+case class OptionBooleanData(name: String, isGood: Option[Boolean])
+case class OptionBooleanIntData(name: String, isGood: Option[(Boolean,
Int)])
+
+case class OptionBooleanAggregator(colName: String)
+ extends Aggregator[Row, Option[Boolean], Option[Boolean]] {
+
+ override def zero: Option[Boolean] = None
+
+ override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean]
= {
+ val index = row.fieldIndex(colName)
+ val value = if (row.isNullAt(index)) {
+ Option.empty[Boolean]
+ } else {
+ Some(row.getBoolean(index))
+ }
+ merge(buffer, value)
+ }
+
+ override def merge(b1: Option[Boolean], b2: Option[Boolean]):
Option[Boolean] = {
+ if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
+ Some(true)
+ } else if (b1.isDefined) {
+ b1
+ } else {
+ b2
+ }
+ }
+
+ override def finish(reduction: Option[Boolean]): Option[Boolean] =
reduction
+
+ override def bufferEncoder: Encoder[Option[Boolean]] =
OptionalBoolEncoder
+ override def outputEncoder: Encoder[Option[Boolean]] =
OptionalBoolEncoder
+
+ def OptionalBoolEncoder: Encoder[Option[Boolean]] = ExpressionEncoder()
+}
+
+case class OptionBooleanIntAggregator(colName: String)
+ extends Aggregator[Row, Option[(Boolean, Int)], Option[(Boolean,
Int)]] {
+
+ override def zero: Option[(Boolean, Int)] = None
+
+ override def reduce(buffer: Option[(Boolean, Int)], row: Row):
Option[(Boolean, Int)] = {
+ val index = row.fieldIndex(colName)
+ val value = if (row.isNullAt(index)) {
+ Option.empty[(Boolean, Int)]
+ } else {
+ val nestedRow = row.getStruct(index)
+ Some((nestedRow.getBoolean(0), nestedRow.getInt(1)))
+ }
+ merge(buffer, value)
+ }
+
+ override def merge(
+ b1: Option[(Boolean, Int)],
+ b2: Option[(Boolean, Int)]): Option[(Boolean, Int)] = {
+ if ((b1.isDefined && b1.get._1) || (b2.isDefined && b2.get._1)) {
+ val newInt = b1.map(_._2).getOrElse(0) + b2.map(_._2).getOrElse(0)
+ Some((true, newInt))
+ } else if (b1.isDefined) {
+ b1
+ } else {
+ b2
+ }
+ }
+
+ override def finish(reduction: Option[(Boolean, Int)]): Option[(Boolean,
Int)] = reduction
+
+ override def bufferEncoder: Encoder[Option[(Boolean, Int)]] =
OptionalBoolIntEncoder
+ override def outputEncoder: Encoder[Option[(Boolean, Int)]] =
OptionalBoolIntEncoder
+
+ def OptionalBoolIntEncoder: Encoder[Option[(Boolean, Int)]] =
ExpressionEncoder(topLevel = false)
--- End diff --
We can create Dataset like:
```scala
scala> Seq((1, Some(1, 2)), (2, Some(3, 4))).toDS.printSchema
root
|-- _1: integer (nullable = false)
|-- _2: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: integer (nullable = false)
```
But now we can't use it as buffer/output encoding here. But the encoder
here is not for top-level.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]