Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/15976#discussion_r89178617
--- Diff:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala
---
@@ -325,70 +320,67 @@ class ObjectHashAggregateSuite
// Currently Spark SQL doesn't support evaluating distinct
aggregate function together
// with aggregate functions without partial aggregation
support.
- if (!(aggs.contains(withoutPartial) &&
aggs.contains(withDistinct))) {
- // TODO Re-enables them after fixing SPARK-18403
- ignore(
- s"randomized aggregation test - " +
- s"${names.mkString("[", ", ", "]")} - " +
- s"${if (withGroupingKeys) "with" else "without"}
grouping keys - " +
- s"with ${if (emptyInput) "empty" else "non-empty"} input"
- ) {
- var expected: Seq[Row] = null
- var actual1: Seq[Row] = null
- var actual2: Seq[Row] = null
-
- // Disables `ObjectHashAggregateExec` to obtain a standard
answer
- withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
- val aggDf = doAggregation(df)
-
- if (aggs.intersect(Seq(withoutPartial, withPartialSafe,
typed)).nonEmpty) {
- assert(containsSortAggregateExec(aggDf))
- assert(!containsObjectHashAggregateExec(aggDf))
- assert(!containsHashAggregateExec(aggDf))
- } else {
- assert(!containsSortAggregateExec(aggDf))
- assert(!containsObjectHashAggregateExec(aggDf))
- assert(containsHashAggregateExec(aggDf))
- }
-
- expected = aggDf.collect().toSeq
+ test(
+ s"randomized aggregation test - " +
+ s"${names.mkString("[", ", ", "]")} - " +
+ s"${if (withGroupingKeys) "with" else "without"} grouping
keys - " +
+ s"with ${if (emptyInput) "empty" else "non-empty"} input"
+ ) {
+ var expected: Seq[Row] = null
+ var actual1: Seq[Row] = null
+ var actual2: Seq[Row] = null
+
+ // Disables `ObjectHashAggregateExec` to obtain a standard
answer
+ withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+ val aggDf = doAggregation(df)
+
+ if (aggs.intersect(Seq(withPartialSafe, typed)).nonEmpty) {
+ assert(containsSortAggregateExec(aggDf))
+ assert(!containsObjectHashAggregateExec(aggDf))
+ assert(!containsHashAggregateExec(aggDf))
+ } else {
+ assert(!containsSortAggregateExec(aggDf))
+ assert(!containsObjectHashAggregateExec(aggDf))
+ assert(containsHashAggregateExec(aggDf))
}
- // Enables `ObjectHashAggregateExec`
- withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
- val aggDf = doAggregation(df)
-
- if (aggs.contains(typed) &&
!aggs.contains(withoutPartial)) {
- assert(!containsSortAggregateExec(aggDf))
- assert(containsObjectHashAggregateExec(aggDf))
- assert(!containsHashAggregateExec(aggDf))
- } else if (aggs.intersect(Seq(withoutPartial,
withPartialSafe)).nonEmpty) {
- assert(containsSortAggregateExec(aggDf))
- assert(!containsObjectHashAggregateExec(aggDf))
- assert(!containsHashAggregateExec(aggDf))
- } else {
- assert(!containsSortAggregateExec(aggDf))
- assert(!containsObjectHashAggregateExec(aggDf))
- assert(containsHashAggregateExec(aggDf))
- }
-
- // Disables sort-based aggregation fallback (we only
generate 50 rows, so 100 is
- // big enough) to obtain a result to be checked.
-
withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "100") {
- actual1 = aggDf.collect().toSeq
- }
-
- // Enables sort-based aggregation fallback to obtain
another result to be checked.
-
withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "3") {
- // Here we are not reusing `aggDf` because the
physical plan in `aggDf` is
- // cached and won't be re-planned using the new
fallback threshold.
- actual2 = doAggregation(df).collect().toSeq
- }
+ expected = aggDf.collect().toSeq
+ }
+
+ // Enables `ObjectHashAggregateExec`
+ withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+ val aggDf = doAggregation(df)
+
+ if (aggs.contains(typed)) {
+ assert(!containsSortAggregateExec(aggDf))
+ assert(containsObjectHashAggregateExec(aggDf))
+ assert(!containsHashAggregateExec(aggDf))
+ } else if (aggs.contains(withPartialSafe)) {
+ assert(containsSortAggregateExec(aggDf))
+ assert(!containsObjectHashAggregateExec(aggDf))
+ assert(!containsHashAggregateExec(aggDf))
+ } else {
+ assert(!containsSortAggregateExec(aggDf))
+ assert(!containsObjectHashAggregateExec(aggDf))
+ assert(containsHashAggregateExec(aggDf))
}
- doubleSafeCheckRows(actual1, expected, 1e-4)
- doubleSafeCheckRows(actual2, expected, 1e-4)
+ // Disables sort-based aggregation fallback (we only
generate 50 rows, so 100 is
+ // big enough) to obtain a result to be checked.
+
withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "100") {
+ actual1 = aggDf.collect().toSeq
+ }
+
+ // Enables sort-based aggregation fallback to obtain
another result to be checked.
+
withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "3") {
+ // Here we are not reusing `aggDf` because the physical
plan in `aggDf` is
+ // cached and won't be re-planned using the new fallback
threshold.
+ actual2 = doAggregation(df).collect().toSeq
+ }
}
+
+ doubleSafeCheckRows(actual1, expected, 1e-4)
+ doubleSafeCheckRows(actual2, expected, 1e-4)
--- End diff --
All the changes made above in this file are used to resolve a logical
conflict with PR #15703. We don't really have any aggregate functions that
don't support partial aggregation now after merging #15703, must update the
tests to reflect that.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]