karuppayya commented on code in PR #52551:
URL: https://github.com/apache/spark/pull/52551#discussion_r2430239854
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtils.scala:
##########
@@ -53,6 +61,26 @@ object ThetaSketchUtils {
}
}
+ /**
+ * Converts a family string to DataSketches Family enum.
+ * Throws a Spark SQL exception if the family name is invalid.
+ *
+ * @param familyName The family name string
+ * @param prettyName The display name of the function/expression for error
messages
+ * @return The corresponding DataSketches Family enum value
+ */
+ def parseFamily(familyName: String, prettyName: String): Family = {
Review Comment:
done
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala:
##########
@@ -59,10 +59,12 @@ case class FinalizedSketch(sketch: CompactSketch) extends
ThetaSketchState {
*
* See [[https://datasketches.apache.org/docs/Theta/ThetaSketches.html]] for
more information.
*
- * @param left
+ * @param child
* child expression against which unique counting will occur
- * @param right
+ * @param lgNomEntriesExpr
Review Comment:
done
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtils.scala:
##########
@@ -36,6 +39,11 @@ object ThetaSketchUtils {
final val MAX_LG_NOM_LONGS = 26
final val DEFAULT_LG_NOM_LONGS = 12
+ // Family constants for ThetaSketch
Review Comment:
done
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala:
##########
@@ -122,15 +142,16 @@ case class ThetaSketchAgg(
copy(inputAggBufferOffset = newInputAggBufferOffset)
override protected def withNewChildrenInternal(
Review Comment:
done
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala:
##########
@@ -71,46 +73,64 @@ case class FinalizedSketch(sketch: CompactSketch) extends
ThetaSketchState {
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
- _FUNC_(expr, lgNomEntries) - Returns the ThetaSketch compact binary
representation.
+ _FUNC_(expr, lgNomEntries, family) - Returns the ThetaSketch compact
binary representation.
`lgNomEntries` (optional) is the log-base-2 of nominal entries, with
nominal entries deciding
- the number buckets or slots for the ThetaSketch. """,
+ the number buckets or slots for the ThetaSketch.
+ `family` (optional) is the sketch family, either 'QUICKSELECT' or
'ALPHA' (defaults to
+ 'QUICKSELECT').""",
examples = """
Examples:
+ > SELECT theta_sketch_estimate(_FUNC_(col)) FROM VALUES (1), (1), (2),
(2), (3) tab(col);
+ 3
> SELECT theta_sketch_estimate(_FUNC_(col, 12)) FROM VALUES (1), (1),
(2), (2), (3) tab(col);
3
+ > SELECT theta_sketch_estimate(_FUNC_(col, 15, 'ALPHA')) FROM VALUES
(1), (1), (2), (2), (3) tab(col);
+ 3
""",
group = "agg_funcs",
since = "4.1.0")
// scalastyle:on line.size.limit
case class ThetaSketchAgg(
- left: Expression,
- right: Expression,
+ child: Expression,
+ lgNomEntriesExpr: Expression,
+ familyExpr: Expression,
override val mutableAggBufferOffset: Int,
override val inputAggBufferOffset: Int)
extends TypedImperativeAggregate[ThetaSketchState]
- with BinaryLike[Expression]
Review Comment:
done
##########
sql/api/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -1242,6 +1253,47 @@ object functions {
def theta_sketch_agg(columnName: String): Column =
theta_sketch_agg(Column(columnName))
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches ThetaSketch
+ * built with the values in the input column, configured with `lgNomEntries`
and `family`.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def theta_sketch_agg(e: Column, lgNomEntries: Int, family: String): Column =
+ Column.fn("theta_sketch_agg", e, lit(lgNomEntries), lit(family))
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches ThetaSketch
+ * built with the values in the input column, configured with `lgNomEntries`
and `family`.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def theta_sketch_agg(columnName: String, lgNomEntries: Int, family: String):
Column =
+ theta_sketch_agg(Column(columnName), lgNomEntries, family)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches ThetaSketch
+ * built with the values in the input column, configured with the specified
`family` and default
+ * lgNomEntries.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def theta_sketch_agg(e: Column, family: String): Column =
+ theta_sketch_agg(e, 12, family)
Review Comment:
Yeah, I gave some thoughts on this before adding it here. Ideally the
resolution to use 12 for logNomEntries should be in `ThetaSketchAgg`.
ie to say if I just pass the column and family, the logNomEntries gets
defaulted in the `ThetaSketchAgg`.
But,
[FunctionRegistry](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L145)
selects the constructor based on the number of expressions. I cannot add a
second constructor with same signature as
[this](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala#L108)
.
So i decided to default in functions.scala. I also didnt see similar pattern
in a different function, so i am not very sure either.
Referencing a local constant in this class also seemed a bit weird, since
that would be very specific.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala:
##########
@@ -71,46 +73,64 @@ case class FinalizedSketch(sketch: CompactSketch) extends
ThetaSketchState {
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
- _FUNC_(expr, lgNomEntries) - Returns the ThetaSketch compact binary
representation.
+ _FUNC_(expr, lgNomEntries, family) - Returns the ThetaSketch compact
binary representation.
`lgNomEntries` (optional) is the log-base-2 of nominal entries, with
nominal entries deciding
- the number buckets or slots for the ThetaSketch. """,
+ the number buckets or slots for the ThetaSketch.
+ `family` (optional) is the sketch family, either 'QUICKSELECT' or
'ALPHA' (defaults to
+ 'QUICKSELECT').""",
examples = """
Examples:
+ > SELECT theta_sketch_estimate(_FUNC_(col)) FROM VALUES (1), (1), (2),
(2), (3) tab(col);
+ 3
> SELECT theta_sketch_estimate(_FUNC_(col, 12)) FROM VALUES (1), (1),
(2), (2), (3) tab(col);
3
+ > SELECT theta_sketch_estimate(_FUNC_(col, 15, 'ALPHA')) FROM VALUES
(1), (1), (2), (2), (3) tab(col);
+ 3
""",
group = "agg_funcs",
since = "4.1.0")
// scalastyle:on line.size.limit
case class ThetaSketchAgg(
- left: Expression,
- right: Expression,
+ child: Expression,
+ lgNomEntriesExpr: Expression,
+ familyExpr: Expression,
override val mutableAggBufferOffset: Int,
override val inputAggBufferOffset: Int)
extends TypedImperativeAggregate[ThetaSketchState]
- with BinaryLike[Expression]
with ExpectsInputTypes {
// ThetaSketch config - mark as lazy so that they're not evaluated during
tree transformation.
- lazy val lgNomEntries: Int = {
- val lgNomEntriesInput = right.eval().asInstanceOf[Int]
+ private lazy val lgNomEntries: Int = {
+ val lgNomEntriesInput = lgNomEntriesExpr.eval().asInstanceOf[Int]
ThetaSketchUtils.checkLgNomLongs(lgNomEntriesInput, prettyName)
lgNomEntriesInput
}
- // Constructors
+ private lazy val family: Family =
+
ThetaSketchUtils.parseFamily(familyExpr.eval().asInstanceOf[UTF8String].toString,
prettyName)
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]