[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466767625



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##
@@ -353,4 +353,8 @@ object AggUtils {
 
 finalAndCompleteAggregate :: Nil
   }
+
+  def areAggExpressionsPartial(modes: Seq[AggregateMode]): Boolean = {
+modes.nonEmpty && modes.forall(_ == Partial)

Review comment:
   We canno apply this optimization if the empty case? e.g., 
   ```
   scala> sql("select k from t group by k").explain()
   == Physical Plan ==
   *(2) HashAggregate(keys=[k#29], functions=[])
   +- Exchange hashpartitioning(k#29, 200), true, [id=#47]
  +- *(1) HashAggregate(keys=[k#29], functions=[])
 +- *(1) ColumnarToRow
+- FileScan parquet default.t[k#29] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t],
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466765987



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
##
@@ -63,6 +63,14 @@
*/
   private final UnsafeRow currentAggregationBuffer;
 
+  /**
+   * Number of rows that were added to the map
+   * This includes the elements that were passed on sorter
+   * using {@link #destructAndCreateExternalSorter()}
+   *

Review comment:
   nit: remove this blank.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466765488



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##
@@ -353,4 +353,8 @@ object AggUtils {
 
 finalAndCompleteAggregate :: Nil
   }
+
+  def areAggExpressionsPartial(modes: Seq[AggregateMode]): Boolean = {

Review comment:
   If we don't reuse it in the main codebase, I think its better to move 
this func into HashAggregateExec.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466757897



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -409,6 +411,12 @@ case class HashAggregateExec(
   private var fastHashMapTerm: String = _
   private var isFastHashMapEnabled: Boolean = false
 
+  private var avoidSpillInPartialAggregateTerm: String = _
+  private val skipPartialAggregate = sqlContext.conf.skipPartialAggregate &&
+AggUtils.areAggExpressionsPartial(modes) && 
find(_.isInstanceOf[ExpandExec]).isEmpty

Review comment:
   hm, I see... But, I think the current approach looks dangerous because 
we might add a new plan having the same assumption in future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466760439



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -680,6 +688,16 @@ case class HashAggregateExec(
 
   private def doProduceWithKeys(ctx: CodegenContext): String = {
 val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
+if (skipPartialAggregate) {
+  avoidSpillInPartialAggregateTerm = ctx.
+addMutableState(CodeGenerator.JAVA_BOOLEAN,
+  "avoidPartialAggregate",
+  term => s"$term = ${Utils.isTesting};")
+}
+val childrenConsumed = ctx.
+  addMutableState(CodeGenerator.JAVA_BOOLEAN, "childrenConsumed")
+rowCountTerm = ctx.
+  addMutableState(CodeGenerator.JAVA_LONG, "rowCount")

Review comment:
   Because of the historical reason, we avoid generating unnecesary mutable 
states where poissible. See: https://issues.apache.org/jira/browse/SPARK-18016
   I think it is best to generate the same generated code with that in the 
master if `skipPartialAggregate=false`.
   Did you suggest it is impossible to do so?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466760439



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -680,6 +688,16 @@ case class HashAggregateExec(
 
   private def doProduceWithKeys(ctx: CodegenContext): String = {
 val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
+if (skipPartialAggregate) {
+  avoidSpillInPartialAggregateTerm = ctx.
+addMutableState(CodeGenerator.JAVA_BOOLEAN,
+  "avoidPartialAggregate",
+  term => s"$term = ${Utils.isTesting};")
+}
+val childrenConsumed = ctx.
+  addMutableState(CodeGenerator.JAVA_BOOLEAN, "childrenConsumed")
+rowCountTerm = ctx.
+  addMutableState(CodeGenerator.JAVA_LONG, "rowCount")

Review comment:
   Because of the historical reason, we avoid generating unnecesary mutable 
states where poissible. See: https://issues.apache.org/jira/browse/SPARK-18016
   I think it is best to generate the same generated code if 
`skipPartialAggregate=false`.
   Did you suggest it is impossible to do so?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466758218



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -680,6 +688,16 @@ case class HashAggregateExec(
 
   private def doProduceWithKeys(ctx: CodegenContext): String = {
 val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
+if (skipPartialAggregate) {
+  avoidSpillInPartialAggregateTerm = ctx.
+addMutableState(CodeGenerator.JAVA_BOOLEAN,
+  "avoidPartialAggregate",
+  term => s"$term = ${Utils.isTesting};")

Review comment:
   I didn't mean so, why do we need this flag here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466757897



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -409,6 +411,12 @@ case class HashAggregateExec(
   private var fastHashMapTerm: String = _
   private var isFastHashMapEnabled: Boolean = false
 
+  private var avoidSpillInPartialAggregateTerm: String = _
+  private val skipPartialAggregate = sqlContext.conf.skipPartialAggregate &&
+AggUtils.areAggExpressionsPartial(modes) && 
find(_.isInstanceOf[ExpandExec]).isEmpty

Review comment:
   hm, I see... But, I think the current approach looks dangerous because 
we might add a new plan having the same assumption in feature.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466756945



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -680,6 +688,16 @@ case class HashAggregateExec(
 
   private def doProduceWithKeys(ctx: CodegenContext): String = {

Review comment:
   Ah, I see.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466440577



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,29 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sorter(sort/spill) during partial aggregation")

Review comment:
   Please check the other doc descriptions, e.g., 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2111-L2113





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-08-06 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466377228



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -680,6 +688,16 @@ case class HashAggregateExec(
 
   private def doProduceWithKeys(ctx: CodegenContext): String = {
 val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
+if (skipPartialAggregate) {
+  avoidSpillInPartialAggregateTerm = ctx.
+addMutableState(CodeGenerator.JAVA_BOOLEAN,
+  "avoidPartialAggregate",
+  term => s"$term = ${Utils.isTesting};")
+}
+val childrenConsumed = ctx.
+  addMutableState(CodeGenerator.JAVA_BOOLEAN, "childrenConsumed")
+rowCountTerm = ctx.
+  addMutableState(CodeGenerator.JAVA_LONG, "rowCount")

Review comment:
   We need the two variables even if `skipPartialAggregate`=false?

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -142,52 +142,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   }
 
   test("Aggregate metrics: track avg probe") {
-// The executed plan looks like:
-// HashAggregate(keys=[a#61], functions=[count(1)], output=[a#61, 
count#71L])
-// +- Exchange hashpartitioning(a#61, 5)
-//+- HashAggregate(keys=[a#61], functions=[partial_count(1)], 
output=[a#61, count#76L])
-//   +- Exchange RoundRobinPartitioning(1)
-//  +- LocalTableScan [a#61]
-//
-// Assume the execution plan with node id is:
-// Wholestage disabled:
-// HashAggregate(nodeId = 0)
-//   Exchange(nodeId = 1)
-// HashAggregate(nodeId = 2)
-//   Exchange (nodeId = 3)
-// LocalTableScan(nodeId = 4)
-//
-// Wholestage enabled:
-// WholeStageCodegen(nodeId = 0)
-//   HashAggregate(nodeId = 1)
-// Exchange(nodeId = 2)
-//   WholeStageCodegen(nodeId = 3)
-// HashAggregate(nodeId = 4)
-//   Exchange(nodeId = 5)
-// LocalTableScan(nodeId = 6)
-Seq(true, false).foreach { enableWholeStage =>
-  val df = generateRandomBytesDF().repartition(1).groupBy('a).count()
-  val nodeIds = if (enableWholeStage) {
-Set(4L, 1L)
-  } else {
-Set(2L, 0L)
-  }
-  val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
-  nodeIds.foreach { nodeId =>
-val probes = metrics(nodeId)._2("avg hash probe bucket list 
iters").toString
-if (!probes.contains("\n")) {
-  // It's a single metrics value
-  assert(probes.toDouble > 1.0)
+if 
(spark.sessionState.conf.getConf(SQLConf.SKIP_PARTIAL_AGGREGATE_ENABLED)) {
+  logInfo("Skipping, since partial Aggregation is disabled")
+} else {
+  // The executed plan looks like:
+  // HashAggregate(keys=[a#61], functions=[count(1)], output=[a#61, 
count#71L])
+  // +- Exchange hashpartitioning(a#61, 5)
+  //+- HashAggregate(keys=[a#61], functions=[partial_count(1)], 
output=[a#61, count#76L])
+  //   +- Exchange RoundRobinPartitioning(1)
+  //  +- LocalTableScan [a#61]
+  //
+  // Assume the execution plan with node id is:
+  // Wholestage disabled:
+  // HashAggregate(nodeId = 0)
+  //   Exchange(nodeId = 1)
+  // HashAggregate(nodeId = 2)
+  //   Exchange (nodeId = 3)
+  // LocalTableScan(nodeId = 4)
+  //
+  // Wholestage enabled:
+  // WholeStageCodegen(nodeId = 0)
+  //   HashAggregate(nodeId = 1)
+  // Exchange(nodeId = 2)
+  //   WholeStageCodegen(nodeId = 3)
+  // HashAggregate(nodeId = 4)
+  //   Exchange(nodeId = 5)
+  // LocalTableScan(nodeId = 6)
+  Seq(true, false).foreach { enableWholeStage =>
+val df = generateRandomBytesDF().repartition(1).groupBy('a).count()
+val nodeIds = if (enableWholeStage) {
+  Set(4L, 1L)
 } else {
-  val mainValue = 
probes.split("\n").apply(1).stripPrefix("(").stripSuffix(")")
-  // Extract min, med, max from the string and strip off everthing 
else.
-  val index = mainValue.indexOf(" (", 0)
-  mainValue.slice(0, index).split(", ").foreach {
-probe => assert(probe.toDouble > 1.0)
+  Set(2L, 0L)
+}
+val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
+nodeIds.foreach { nodeId =>
+  val probes = metrics(nodeId)._2("avg hash probe bucket list 
iters").toString
+  if (!probes.contains("\n")) {
+// It's a single metrics value
+assert(probes.toDouble > 1.0)
+  } else {
+val mainValue = 
probes.split("\n").apply(1).stripPrefix("(").stripSuffix(")")
+// Extract min, med, max from the string and strip off everthing 

[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-07-08 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r451882430



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,25 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(true)
+
+  val SKIP_PARTIAL_AGGREGATE_THRESHOLD =
+buildConf("spark.sql.aggregate.partialaggregate.skip.threshold")
+  .internal()
+  .longConf
+  .createWithDefault(10)
+
+  val SKIP_PARTIAL_AGGREGATE_RATIO =
+buildConf("spark.sql.aggregate.partialaggregate.skip.ratio")

Review comment:
   Could you describe more in `.doc`  for the two params? When reading 
them, I couldn't understand how-to-use.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-07-08 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r451880238



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,25 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(true)
+
+  val SKIP_PARTIAL_AGGREGATE_THRESHOLD =
+buildConf("spark.sql.aggregate.partialaggregate.skip.threshold")
+  .internal()
+  .longConf
+  .createWithDefault(10)
+
+  val SKIP_PARTIAL_AGGREGATE_RATIO =
+buildConf("spark.sql.aggregate.partialaggregate.skip.ratio")
+  .internal()
+  .doubleConf
+  .createWithDefault(0.5)

Review comment:
   Also, could you check performance numbers by varying the params?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-07-08 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r451879710



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,25 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(true)
+
+  val SKIP_PARTIAL_AGGREGATE_THRESHOLD =
+buildConf("spark.sql.aggregate.partialaggregate.skip.threshold")
+  .internal()
+  .longConf
+  .createWithDefault(10)
+
+  val SKIP_PARTIAL_AGGREGATE_RATIO =
+buildConf("spark.sql.aggregate.partialaggregate.skip.ratio")
+  .internal()
+  .doubleConf
+  .createWithDefault(0.5)

Review comment:
   Why we need the two params for this optimiation? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-07-03 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r447512443



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,13 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(true)

Review comment:
   >  They turn off map side aggregate (i.e., partial aggregate will be 
pass through) in Physical operator (i.e., Group-By operator) if map-side 
aggregation reduce the entries by at least half and they look at 10 rows to 
do that 
   
   I think whether that approach improves performance depends on IO 
performance, but the idea looks interesting to me. WDYT? @cloud-fan 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-30 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r447512443



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,13 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(true)

Review comment:
   >  They turn off map side aggregate (i.e., partial aggregate will be 
pass through) in Physical operator (i.e., Group-By operator) if map-side 
aggregation reduce the entries by at least half and they look at 10 rows to 
do that 
   
   Looks whether that approach improves performance depends on IO performance, 
but looks interesting to me. WDYT? @cloud-fan 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-28 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r446720097



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,13 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(true)

Review comment:
   That meant a ratio of a distinct row count and total row count in 
group-by key column stats. For example, if a number `distinctCount / rowCount` 
is close to 1.0, you apply the optimization; otherwise, you don't.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-27 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r446571681



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,13 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(true)

Review comment:
   Could we use a threadhold + column stats instead of this boolean config?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-20 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r443119807



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2196,6 +2196,13 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SKIP_PARTIAL_AGGREGATE_ENABLED =
+buildConf("spark.sql.aggregate.partialaggregate.skip.enabled")
+  .internal()
+  .doc("Avoid sort/spill to disk during partial aggregation")
+  .booleanConf
+  .createWithDefault(false)

Review comment:
   Could you check if all the existing tests can pass when setting true to 
this config?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-20 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r443119418



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -879,42 +901,53 @@ case class HashAggregateExec(
 
 val oomeClassName = classOf[SparkOutOfMemoryError].getName
 
+
+

Review comment:
   Plz revert the unnecessary changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-17 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r441932845



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2173,6 +2173,13 @@ object SQLConf {
   .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
   .createWithDefault(16)
 
+  val SPILL_PARTIAL_AGGREGATE_DISABLED =
+buildConf("spark.sql.aggregate.spill.partialaggregate.disabled")

Review comment:
   `disabled` -> `enabled` to follow the other config naming.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-17 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r441932524



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##
@@ -72,6 +74,8 @@ case class HashAggregateExec(
 "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
 "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
 "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in 
aggregation build"),
+"partialAggSkipped" -> SQLMetrics.createMetric(sparkContext, "Num records" 
+
+  " skipped partial aggregation skipped"),

Review comment:
   skipped ... skipped? How about `number of skipped records for partial 
aggregates`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

2020-06-11 Thread GitBox


maropu commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r439160476



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
##
@@ -165,6 +166,26 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
 }
   }
 
+  test("SPARK-: Avoid spill in partial aggregation " +
+"when spark.sql.aggregate.spill.partialaggregate.disabled is set") {
+withSQLConf((SQLConf.SPILL_PARTIAL_AGGREGATE_DISABLED.key, "true"),

Review comment:
   Could you show us performance numbers?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org