spark git commit: [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation

2015-11-10 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d2405cb5e -> 6e2e84f3e


[SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation

Currently the user facing api for typed aggregation has some limitations:

* the customized typed aggregation must be the first of aggregation list
* the customized typed aggregation can only use long as buffer type
* the customized typed aggregation can only use flat type as result type

This PR tries to remove these limitations.

Author: Wenchen Fan 

Closes #9599 from cloud-fan/agg.

(cherry picked from commit dfcfcbcc0448ebc6f02eba6bf0495832a321c87e)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e2e84f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e2e84f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e2e84f3

Branch: refs/heads/branch-1.6
Commit: 6e2e84f3ea06df459c7fdda947f2c56c2869145d
Parents: d2405cb
Author: Wenchen Fan 
Authored: Tue Nov 10 11:14:25 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 10 11:14:34 2015 -0800

--
 .../catalyst/encoders/ExpressionEncoder.scala   |  6 +++
 .../aggregate/TypedAggregateExpression.scala| 50 +--
 .../spark/sql/expressions/Aggregator.scala  |  5 ++
 .../spark/sql/DatasetAggregatorSuite.scala  | 52 
 4 files changed, 99 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e2e84f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index c287aeb..005c062 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -185,6 +185,12 @@ case class ExpressionEncoder[T](
 })
   }
 
+  def shift(delta: Int): ExpressionEncoder[T] = {
+copy(constructExpression = constructExpression transform {
+  case r: BoundReference => r.copy(ordinal = r.ordinal + delta)
+})
+  }
+
   /**
* Returns a copy of this encoder where the expressions used to create an 
object given an
* input row have been modified to pull the object out from a nested struct, 
instead of the

http://git-wip-us.apache.org/repos/asf/spark/blob/6e2e84f3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
index 24d8122..0e5bc1f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate
 import scala.language.existentials
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder}
 import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
-import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{StructType, DataType}
+import org.apache.spark.sql.types._
 
 object TypedAggregateExpression {
   def apply[A, B : Encoder, C : Encoder](
@@ -67,8 +67,11 @@ case class TypedAggregateExpression(
 
   override def nullable: Boolean = true
 
-  // TODO: this assumes flat results...
-  override def dataType: DataType = cEncoder.schema.head.dataType
+  override def dataType: DataType = if (cEncoder.flat) {
+cEncoder.schema.head.dataType
+  } else {
+cEncoder.schema
+  }
 
   override def deterministic: Boolean = true
 
@@ -93,32 +96,51 @@ case class TypedAggregateExpression(
   case a: AttributeReference => inputMapping(a)
 })
 
-  // TODO: this probably only works when we are in the first column.
   val bAttributes = bEncoder.schema.toAttributes
   lazy val boundB = bEncoder.resolve(bAttributes).bind(bAttributes)
 
+  private def updateBuffer(buffer: MutableRow, value: InternalRow): Unit = {
+// todo: need a more neat way to assign the value.
+var i = 0
+while (i < agg

spark git commit: [SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation

2015-11-10 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 47735cdc2 -> dfcfcbcc0


[SPARK-11578][SQL][FOLLOW-UP] complete the user facing api for typed aggregation

Currently the user facing api for typed aggregation has some limitations:

* the customized typed aggregation must be the first of aggregation list
* the customized typed aggregation can only use long as buffer type
* the customized typed aggregation can only use flat type as result type

This PR tries to remove these limitations.

Author: Wenchen Fan 

Closes #9599 from cloud-fan/agg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dfcfcbcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dfcfcbcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dfcfcbcc

Branch: refs/heads/master
Commit: dfcfcbcc0448ebc6f02eba6bf0495832a321c87e
Parents: 47735cd
Author: Wenchen Fan 
Authored: Tue Nov 10 11:14:25 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 10 11:14:25 2015 -0800

--
 .../catalyst/encoders/ExpressionEncoder.scala   |  6 +++
 .../aggregate/TypedAggregateExpression.scala| 50 +--
 .../spark/sql/expressions/Aggregator.scala  |  5 ++
 .../spark/sql/DatasetAggregatorSuite.scala  | 52 
 4 files changed, 99 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dfcfcbcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index c287aeb..005c062 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -185,6 +185,12 @@ case class ExpressionEncoder[T](
 })
   }
 
+  def shift(delta: Int): ExpressionEncoder[T] = {
+copy(constructExpression = constructExpression transform {
+  case r: BoundReference => r.copy(ordinal = r.ordinal + delta)
+})
+  }
+
   /**
* Returns a copy of this encoder where the expressions used to create an 
object given an
* input row have been modified to pull the object out from a nested struct, 
instead of the

http://git-wip-us.apache.org/repos/asf/spark/blob/dfcfcbcc/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
index 24d8122..0e5bc1f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate
 import scala.language.existentials
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder}
 import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
-import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{StructType, DataType}
+import org.apache.spark.sql.types._
 
 object TypedAggregateExpression {
   def apply[A, B : Encoder, C : Encoder](
@@ -67,8 +67,11 @@ case class TypedAggregateExpression(
 
   override def nullable: Boolean = true
 
-  // TODO: this assumes flat results...
-  override def dataType: DataType = cEncoder.schema.head.dataType
+  override def dataType: DataType = if (cEncoder.flat) {
+cEncoder.schema.head.dataType
+  } else {
+cEncoder.schema
+  }
 
   override def deterministic: Boolean = true
 
@@ -93,32 +96,51 @@ case class TypedAggregateExpression(
   case a: AttributeReference => inputMapping(a)
 })
 
-  // TODO: this probably only works when we are in the first column.
   val bAttributes = bEncoder.schema.toAttributes
   lazy val boundB = bEncoder.resolve(bAttributes).bind(bAttributes)
 
+  private def updateBuffer(buffer: MutableRow, value: InternalRow): Unit = {
+// todo: need a more neat way to assign the value.
+var i = 0
+while (i < aggBufferAttributes.length) {
+  aggBufferSchema(i).dataType match {
+case IntegerType => buffer.setIn