spark git commit: [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation
Repository: spark Updated Branches: refs/heads/branch-1.6 d2405cb5e -> 6e2e84f3e [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation Currently the user facing api for typed aggregation has some limitations: * the customized typed aggregation must be the first of aggregation list * the customized typed aggregation can only use long as buffer type * the customized typed aggregation can only use flat type as result type This PR tries to remove these limitations. Author: Wenchen Fan Closes #9599 from cloud-fan/agg. (cherry picked from commit dfcfcbcc0448ebc6f02eba6bf0495832a321c87e) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e2e84f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e2e84f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e2e84f3 Branch: refs/heads/branch-1.6 Commit: 6e2e84f3ea06df459c7fdda947f2c56c2869145d Parents: d2405cb Author: Wenchen Fan Authored: Tue Nov 10 11:14:25 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:14:34 2015 -0800 -- .../catalyst/encoders/ExpressionEncoder.scala | 6 +++ .../aggregate/TypedAggregateExpression.scala| 50 +-- .../spark/sql/expressions/Aggregator.scala | 5 ++ .../spark/sql/DatasetAggregatorSuite.scala | 52 4 files changed, 99 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e2e84f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index c287aeb..005c062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -185,6 +185,12 @@ case class ExpressionEncoder[T]( }) } + def shift(delta: Int): ExpressionEncoder[T] = { +copy(constructExpression = constructExpression transform { + case r: BoundReference => r.copy(ordinal = r.ordinal + delta) +}) + } + /** * Returns a copy of this encoder where the expressions used to create an object given an * input row have been modified to pull the object out from a nested struct, instead of the http://git-wip-us.apache.org/repos/asf/spark/blob/6e2e84f3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 24d8122..0e5bc1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate import scala.language.existentials import org.apache.spark.Logging +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate -import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{StructType, DataType} +import org.apache.spark.sql.types._ object TypedAggregateExpression { def apply[A, B : Encoder, C : Encoder]( @@ -67,8 +67,11 @@ case class TypedAggregateExpression( override def nullable: Boolean = true - // TODO: this assumes flat results... - override def dataType: DataType = cEncoder.schema.head.dataType + override def dataType: DataType = if (cEncoder.flat) { +cEncoder.schema.head.dataType + } else { +cEncoder.schema + } override def deterministic: Boolean = true @@ -93,32 +96,51 @@ case class TypedAggregateExpression( case a: AttributeReference => inputMapping(a) }) - // TODO: this probably only works when we are in the first column. val bAttributes = bEncoder.schema.toAttributes lazy val boundB = bEncoder.resolve(bAttributes).bind(bAttributes) + private def updateBuffer(buffer: MutableRow, value: InternalRow): Unit = { +// todo: need a more neat way to assign the value. +var i = 0 +while (i < agg
spark git commit: [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation
Repository: spark Updated Branches: refs/heads/master 47735cdc2 -> dfcfcbcc0 [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation Currently the user facing api for typed aggregation has some limitations: * the customized typed aggregation must be the first of aggregation list * the customized typed aggregation can only use long as buffer type * the customized typed aggregation can only use flat type as result type This PR tries to remove these limitations. Author: Wenchen Fan Closes #9599 from cloud-fan/agg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dfcfcbcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dfcfcbcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dfcfcbcc Branch: refs/heads/master Commit: dfcfcbcc0448ebc6f02eba6bf0495832a321c87e Parents: 47735cd Author: Wenchen Fan Authored: Tue Nov 10 11:14:25 2015 -0800 Committer: Michael Armbrust Committed: Tue Nov 10 11:14:25 2015 -0800 -- .../catalyst/encoders/ExpressionEncoder.scala | 6 +++ .../aggregate/TypedAggregateExpression.scala| 50 +-- .../spark/sql/expressions/Aggregator.scala | 5 ++ .../spark/sql/DatasetAggregatorSuite.scala | 52 4 files changed, 99 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dfcfcbcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index c287aeb..005c062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -185,6 +185,12 @@ case class ExpressionEncoder[T]( }) } + def shift(delta: Int): ExpressionEncoder[T] = { +copy(constructExpression = constructExpression transform { + case r: BoundReference => r.copy(ordinal = r.ordinal + delta) +}) + } + /** * Returns a copy of this encoder where the expressions used to create an object given an * input row have been modified to pull the object out from a nested struct, instead of the http://git-wip-us.apache.org/repos/asf/spark/blob/dfcfcbcc/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 24d8122..0e5bc1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate import scala.language.existentials import org.apache.spark.Logging +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate -import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{StructType, DataType} +import org.apache.spark.sql.types._ object TypedAggregateExpression { def apply[A, B : Encoder, C : Encoder]( @@ -67,8 +67,11 @@ case class TypedAggregateExpression( override def nullable: Boolean = true - // TODO: this assumes flat results... - override def dataType: DataType = cEncoder.schema.head.dataType + override def dataType: DataType = if (cEncoder.flat) { +cEncoder.schema.head.dataType + } else { +cEncoder.schema + } override def deterministic: Boolean = true @@ -93,32 +96,51 @@ case class TypedAggregateExpression( case a: AttributeReference => inputMapping(a) }) - // TODO: this probably only works when we are in the first column. val bAttributes = bEncoder.schema.toAttributes lazy val boundB = bEncoder.resolve(bAttributes).bind(bAttributes) + private def updateBuffer(buffer: MutableRow, value: InternalRow): Unit = { +// todo: need a more neat way to assign the value. +var i = 0 +while (i < aggBufferAttributes.length) { + aggBufferSchema(i).dataType match { +case IntegerType => buffer.setIn