[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-18 Thread wzhfy
Github user wzhfy closed the pull request at:

https://github.com/apache/spark/pull/15637


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86647571
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
--- End diff --

Frequency usually means the rate something occurs.  In statistics, 
frequency means the number of items occurring in a given category.  Here 
frequency actually is equivalent to count.  We use frequency and count 
interchangeably.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86579702
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86506104
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

Yea I think it's fine for now to not support those.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86505614
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86505467
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86505390
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86505042
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86504961
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86504716
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86504634
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86504529
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86504505
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  val currentValue = if (numBinsExpression.eval() == null) null else 
numBins
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $currentValue)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (!buffer.isInvalid) {
+  val evaluated = child.eval(input)
+  if (evaluated != null) 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86504123
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function Computes frequency for each distinct non-null 
value of a column.
+ * It returns: 1. null if the table is empty or all values of the column 
are null.
+ * 2. (distinct non-null value, frequency) pairs if the number of distinct 
non-null values is
+ * less than or equal to the specified threshold.
+ * 3. an empty result if the number of distinct non-null values exceeds 
that threshold.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of pairs.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, numBins) - Computes frequency for each distinct non-null 
value of column `col`.
+  It returns: 1. null if the table is empty or all values of column 
`col` are null.
+  2. (distinct non-null value, frequency) pairs if the number of 
distinct non-null values
+  is less than or equal to the specified threshold `numBins`.
+  3. an empty result if the number of distinct non-null values exceeds 
`numBins`.
+  """,
+  extended = """
+Examples:
+  > SELECT map_aggregate(col, 3) FROM tbl;
+   1. null - if `tbl` is empty or values of `col` are all nulls
+   2. Map((10, 2), (20, 1)) - if values of `col` are (10, 20, 10)
+   3. Map.empty - if values of `col` are (1, 2, 3, 4)
+  """)
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
--- End diff --

"must be a literal or constant foldable"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86415255
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

I posted same comments multiple times due to a browser glitch.  I just 
cleaned it up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86413803
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In today's software development environment, speed is an important factor.  
One of the agile development methodology principles is to avoid over design.  
The over design can slow down the development and divert an engineer's 
attention on the unimportant cases.  Since we develop MapAggregate.scala for 
CBO, let's focus on CBO's need.  There is no need to develop NDV and histogram 
for binary data type.  For Boolean data type, we already have number of trues 
and number of falses defined in ColumnStat.  I think wxhfy's code is doing fine 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86411252
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In today's software development environment, speed is an important factor.  
One of the agile development methodology principles is to avoid over design.  
The over design can slow down the development and divert an engineer's 
attention on the unimportant cases.  Since we develop MapAggregate.scala for 
CBO, let's focus on CBO's need.  There is no need to develop NDV and histogram 
for binary data type.  For Boolean data type, we already have number of trues 
and number of falses defined in ColumnStat.  I think wxhfy's code is doing fine 
here.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86411132
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In today's software development environment, speed is an important factor.  
One of the agile development methodology principles is to avoid over design.  
The over design can slow down the development and divert an engineer's 
attention on the unimportant cases.  Since we develop MapAggregate.scala for 
CBO, let's focus on CBO's need.  There is no need to develop NDV and histogram 
for binary data type.  For Boolean data type, we already have number of trues 
and number of falses defined in ColumnStat.  I think wxhfy's code is doing fine 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86410847
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In today's software development environment, speed is an important factor.  
One of the agile development methodology principles is to avoid over design.  
The over design can slow down the development and divert an engineer's 
attention on the unimportant cases.  Since we develop MapAggregate.scala for 
CBO, let's focus on CBO's need.  There is no need to develop NDV and histogram 
for binary data type.  For Boolean data type, we already have number of trues 
and number of falses defined in ColumnStat.  I think wxhfy's code is doing fine 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86408951
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In today's software development environment, speed is an important factor.  
One of the agile development methodology principles is to avoid over design.  
The over design can slow down the development and divert an engineer's 
attention on the unimportant cases.  Since we develop MapAggregate.scala for 
CBO, let's focus on CBO's need.  There is no need to develop NDV and histogram 
for binary data type.  For Boolean data type, we already have number of trues 
and number of falses defined in ColumnStat.  I think we are doing fine here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86409288
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In today's software development environment, speed is an important factor.  
One of the agile development methodology principles is to avoid over design.  
The over design can slow down the development and divert an engineer's 
attention on the unimportant cases.  Since we develop MapAggregate.scala for 
CBO, let's focus on CBO's need.  There is no need to develop NDV and histogram 
for binary data type.  For Boolean data type, we already have number of trues 
and number of falses defined in ColumnStat.  I think we are doing fine here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86409090
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In today's software development environment, speed is an important factor.  
One of the agile development methodology principles is to avoid over design.  
The over design can slow down the development and divert an engineer's 
attention on the unimportant cases.  Since we develop MapAggregate.scala for 
CBO, let's focus on CBO's need.  There is no need to develop NDV and histogram 
for binary data type.  For Boolean data type, we already have number of trues 
and number of falses defined in ColumnStat.  I think we are doing fine here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86031611
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
--- End diff --

I see. Then, we would better to explicitly explain it. `Returns null if the 
result set is empty or all values are null`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r86030388
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
--- End diff --

In the use cases of CBO, YES. However, this function becomes a general one. 
It could be also used/called by external users. Then, it might not make sense 
for this limit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85993076
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $numBins)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (buffer.isInvalid) {
+  return
+}
+val evaluated = child.eval(input)
+if (evaluated != null) {
+  buffer.update(child.dataType, evaluated, numBins)
+}
+  }
+
+  override def merge(buffer: MapDigest, other: MapDigest): Unit = {
+if (buffer.isInvalid) return
+if (other.isInvalid) {
+  buffer.isInvalid = true
+  buffer.clear()
+  return
+}
+buffer.merge(other, numBins)
+  }
+
+  override def eval(buffer: MapDigest): Any = {
+if (buffer.isInvalid) {
+  // return empty map
+  ArrayBasedMapData(Map.empty)
+} else {
+  // sort the result to make it more readable
+   

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85934978
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $numBins)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (buffer.isInvalid) {
+  return
+}
+val evaluated = child.eval(input)
+if (evaluated != null) {
+  buffer.update(child.dataType, evaluated, numBins)
+}
--- End diff --

Yeah, that's better, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85933282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
+  maximum number of bins. Otherwise, it returns an empty map.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
--- End diff --

Oh, I see, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85931101
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85930167
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
--- End diff --

Now if the input is empty, it also returns null, so that empty result is 
returned only when ndv exceeds numBins


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85814522
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $numBins)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (buffer.isInvalid) {
+  return
+}
+val evaluated = child.eval(input)
+if (evaluated != null) {
+  buffer.update(child.dataType, evaluated, numBins)
+}
--- End diff --

A general comment about the impl. Here, I think we should avoid `return` if 
possible. For example, we can re-write it like
```Scala
if (!buffer.isInvalid) {
  val evaluated = child.eval(input)
  if (evaluated != null) {
buffer.update(child.dataType, evaluated, numBins)
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85812423
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
--- End diff --

Describe the general description of this function at first, and then 
explains the return values?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85811315
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/MapAggregateQuerySuite.scala ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+
+class MapAggregateQuerySuite extends QueryTest with SharedSQLContext {
+
+  private val table = "map_aggregate_test"
+  private val col1 = "col1"
+  private val col2 = "col2"
+  private val schema = StructType(Seq(StructField(col1, StringType), 
StructField(col2, DoubleType)))
+
+  private def query(numBins: Int): DataFrame = {
+sql(s"SELECT map_aggregate($col1, $numBins), map_aggregate($col2, 
$numBins) FROM $table")
+  }
+
+  test("null handling") {
+withTempView(table) {
+  // Null input
+  val nullRdd: RDD[Row] = spark.sparkContext.parallelize(Seq(Row(null, 
null)))
+  spark.createDataFrame(nullRdd, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(null, null))
+
+  // Empty input
+  val emptyRdd: RDD[Row] = spark.sparkContext.parallelize(Seq.empty)
+  spark.createDataFrame(emptyRdd, 
schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(null, null))
+
+  // Add some non-null data
+  val rdd: RDD[Row] = spark.sparkContext.parallelize(Seq(Row(null, 
3.0D), Row("a", null)))
+  spark.createDataFrame(rdd, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(Map(("a", 1)), Map((3.0D, 1
+}
+  }
+
+  test("returns empty result when ndv exceeds numBins") {
+withTempView(table) {
+  val rdd: RDD[Row] = spark.sparkContext.parallelize(
+Seq(Row("a", 4.0D), Row("d", 2.0D), Row("c", 4.0D), Row("b", 
1.0D), Row("a", 3.0D),
+  Row("a", 2.0D)), 2)
+  spark.createDataFrame(rdd, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 4), Row(
+Map(("a", 3), ("b", 1), ("c", 1), ("d", 1)),
+Map((1.0D, 1), (2.0D, 2), (3.0D, 1), (4.0D, 2
+  // One partial exceeds numBins during update()
+  checkAnswer(query(numBins = 2), Row(Map.empty, Map.empty))
+  // Exceeding numBins during merge()
+  checkAnswer(query(numBins = 3), Row(Map.empty, Map.empty))
+}
+  }
+
+  test("multiple columns of different types") {
+def queryMultiColumns(numBins: Int): DataFrame = {
+  sql(
+s"""
+   |SELECT
+   |  map_aggregate(c1, $numBins),
+   |  map_aggregate(c2, $numBins),
+   |  map_aggregate(c3, $numBins),
+   |  map_aggregate(c4, $numBins),
+   |  map_aggregate(c5, $numBins),
+   |  map_aggregate(c6, $numBins),
+   |  map_aggregate(c7, $numBins),
+   |  map_aggregate(c8, $numBins),
+   |  map_aggregate(c9, $numBins),
+   |  map_aggregate(c10, $numBins)
+   |FROM $table
+""".stripMargin)
+}
+
+val allTypeSchema = StructType(Seq(
+  StructField("c1", ByteType),
+  StructField("c2", ShortType),
+  StructField("c3", IntegerType),
+  StructField("c4", LongType),
+  StructField("c5", FloatType),
+  StructField("c6", DoubleType),
+  StructField("c7", DecimalType(10, 5)),
+  StructField("c8", DateType),
+  StructField("c9", TimestampType),
+  StructField("c10", StringType)))
--- End diff --

Here, it still misses `BinaryType` and `BooleanType`


---
If your project is set up for it, you can reply 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-30 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85657540
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/MapAggregateQuerySuite.scala ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+
+class MapAggregateQuerySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  private val table = "map_aggregate_test"
+  private val col1 = "col1"
+  private val col2 = "col2"
+  private def query(numBins: Int): DataFrame = {
+sql(s"SELECT map_aggregate($col1, $numBins), map_aggregate($col2, 
$numBins) FROM $table")
+  }
+
+  test("null handling") {
+withTempView(table) {
+  val schema = StructType(Seq(StructField(col1, StringType), 
StructField(col2, DoubleType)))
+  // Empty input row
+  val rdd1 = spark.sparkContext.parallelize(Seq(Row(null, null)))
+  spark.createDataFrame(rdd1, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(Map.empty, Map.empty))
+
+  // Add some non-empty row
+  val rdd2 = spark.sparkContext.parallelize(Seq(Row(null, 3.0D), 
Row("a", null)))
+  spark.createDataFrame(rdd2, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(Map(("a", 1)), Map((3.0D, 1
+}
+  }
+
+  test("returns empty result when ndv exceeds numBins") {
+withTempView(table) {
+  spark.sparkContext.makeRDD(
+Seq(("a", 4), ("d", 2), ("c", 4), ("b", 1), ("a", 3), ("a", 2)), 
2).toDF(col1, col2)
+.createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 4), Row(
+Map(("a", 3), ("b", 1), ("c", 1), ("d", 1)),
+Map((1.0D, 1), (2.0D, 2), (3.0D, 1), (4.0D, 2
+  // One partial exceeds numBins during update()
+  checkAnswer(query(numBins = 2), Row(Map.empty, Map.empty))
+  // Exceeding numBins during merge()
+  checkAnswer(query(numBins = 3), Row(Map.empty, Map.empty))
+}
+  }
+
+  test("multiple columns of different types") {
+def queryMultiColumns(numBins: Int): DataFrame = {
+  sql(
+s"""
+   |SELECT
+   |  map_aggregate(c1, $numBins),
+   |  map_aggregate(c2, $numBins),
+   |  map_aggregate(c3, $numBins),
+   |  map_aggregate(c4, $numBins),
+   |  map_aggregate(c5, $numBins),
+   |  map_aggregate(c6, $numBins),
+   |  map_aggregate(c7, $numBins),
+   |  map_aggregate(c8, $numBins),
+   |  map_aggregate(c9, $numBins),
+   |  map_aggregate(c10, $numBins)
+   |FROM $table
+""".stripMargin)
+}
+
+val ints = Seq(5, 3, 1)
+val doubles = Seq(1.0D, 3.0D, 5.0D)
+val dates = Seq("1970-01-01", "1970-02-02", "1970-03-03")
+val timestamps = Seq("1970-01-01 00:00:00", "1970-01-01 00:00:05", 
"1970-01-01 00:00:10")
+val strings = Seq("a", "bb", "ccc")
+
+val data = ints.indices.map { i =>
+  (ints(i).toByte,
+ints(i).toShort,
+ints(i),
+ints(i).toLong,
+doubles(i).toFloat,
+doubles(i),
+Decimal(doubles(i)),
+Date.valueOf(dates(i)),
+Timestamp.valueOf(timestamps(i)),
+strings(i))
+}
+withTempView(table) {
+  data.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", 
"c10")
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-30 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85657536
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
+  maximum number of bins. Otherwise, it returns an empty map.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $numBins)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (buffer.invalid) {
+  return
+}
+val evaluated = child.eval(input)
+if (evaluated != null) {
+  buffer.update(child.dataType, evaluated, numBins)
+}
+  }
+
+  override def merge(buffer: MapDigest, other: MapDigest): Unit = {
+if (buffer.invalid) return
+if (other.invalid) {
+  buffer.invalid = true
+  buffer.clear()
+  return
+}
+buffer.merge(other, numBins)
+  }
+
+  override def eval(buffer: MapDigest): Any = {
+if (buffer.invalid) {
+  // return empty map
+  ArrayBasedMapData(Map.empty)
+} else {
+  // sort the result to make it more readable
+  val sorted = buffer match {
+case stringDigest: StringMapDigest => TreeMap[UTF8String, 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-30 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85657509
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
+  maximum number of bins. Otherwise, it returns an empty map.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
--- End diff --

null is not int type, it will not pass type check in 
`checkInputDataTypes()`, so it is safe here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85652094
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
--- End diff --

I see. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85646444
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
+  maximum number of bins. Otherwise, it returns an empty map.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
--- End diff --

How about this change?
```Scala
  private lazy val numBins: Int = numBinsExpression.eval() match {
case o: Int => o
case x => throw new AnalysisException(
  s"The maximum number of bins provided must be a foldable integer 
expression: $x")
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85646119
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
+  maximum number of bins. Otherwise, it returns an empty map.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
--- End diff --

`asInstanceOf[Int]` is risky. When `eval()` returns null, it will be casted 
to 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85645925
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
+  maximum number of bins. Otherwise, it returns an empty map.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $numBins)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (buffer.invalid) {
+  return
+}
+val evaluated = child.eval(input)
+if (evaluated != null) {
+  buffer.update(child.dataType, evaluated, numBins)
+}
+  }
+
+  override def merge(buffer: MapDigest, other: MapDigest): Unit = {
+if (buffer.invalid) return
+if (other.invalid) {
+  buffer.invalid = true
+  buffer.clear()
+  return
+}
+buffer.merge(other, numBins)
+  }
+
+  override def eval(buffer: MapDigest): Any = {
+if (buffer.invalid) {
+  // return empty map
+  ArrayBasedMapData(Map.empty)
+} else {
+  // sort the result to make it more readable
+  val sorted = buffer match {
+case stringDigest: StringMapDigest => TreeMap[UTF8String, 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85645802
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/MapAggregateQuerySuite.scala ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+
+class MapAggregateQuerySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  private val table = "map_aggregate_test"
+  private val col1 = "col1"
+  private val col2 = "col2"
+  private def query(numBins: Int): DataFrame = {
+sql(s"SELECT map_aggregate($col1, $numBins), map_aggregate($col2, 
$numBins) FROM $table")
+  }
+
+  test("null handling") {
+withTempView(table) {
+  val schema = StructType(Seq(StructField(col1, StringType), 
StructField(col2, DoubleType)))
+  // Empty input row
+  val rdd1 = spark.sparkContext.parallelize(Seq(Row(null, null)))
+  spark.createDataFrame(rdd1, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(Map.empty, Map.empty))
+
+  // Add some non-empty row
+  val rdd2 = spark.sparkContext.parallelize(Seq(Row(null, 3.0D), 
Row("a", null)))
+  spark.createDataFrame(rdd2, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(Map(("a", 1)), Map((3.0D, 1
+}
+  }
+
+  test("returns empty result when ndv exceeds numBins") {
+withTempView(table) {
+  spark.sparkContext.makeRDD(
+Seq(("a", 4), ("d", 2), ("c", 4), ("b", 1), ("a", 3), ("a", 2)), 
2).toDF(col1, col2)
+.createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 4), Row(
+Map(("a", 3), ("b", 1), ("c", 1), ("d", 1)),
+Map((1.0D, 1), (2.0D, 2), (3.0D, 1), (4.0D, 2
+  // One partial exceeds numBins during update()
+  checkAnswer(query(numBins = 2), Row(Map.empty, Map.empty))
+  // Exceeding numBins during merge()
+  checkAnswer(query(numBins = 3), Row(Map.empty, Map.empty))
+}
+  }
+
+  test("multiple columns of different types") {
+def queryMultiColumns(numBins: Int): DataFrame = {
+  sql(
+s"""
+   |SELECT
+   |  map_aggregate(c1, $numBins),
+   |  map_aggregate(c2, $numBins),
+   |  map_aggregate(c3, $numBins),
+   |  map_aggregate(c4, $numBins),
+   |  map_aggregate(c5, $numBins),
+   |  map_aggregate(c6, $numBins),
+   |  map_aggregate(c7, $numBins),
+   |  map_aggregate(c8, $numBins),
+   |  map_aggregate(c9, $numBins),
+   |  map_aggregate(c10, $numBins)
+   |FROM $table
+""".stripMargin)
+}
+
+val ints = Seq(5, 3, 1)
+val doubles = Seq(1.0D, 3.0D, 5.0D)
+val dates = Seq("1970-01-01", "1970-02-02", "1970-03-03")
+val timestamps = Seq("1970-01-01 00:00:00", "1970-01-01 00:00:05", 
"1970-01-01 00:00:10")
+val strings = Seq("a", "bb", "ccc")
+
+val data = ints.indices.map { i =>
+  (ints(i).toByte,
+ints(i).toShort,
+ints(i),
+ints(i).toLong,
+doubles(i).toFloat,
+doubles(i),
+Decimal(doubles(i)),
+Date.valueOf(dates(i)),
+Timestamp.valueOf(timestamps(i)),
+strings(i))
+}
+withTempView(table) {
+  data.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", 
"c10")
--- End diff --

In this example, below is the printed schema

```
++--+---+
|col_name| data_type|comment|
++--+---+
 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85645776
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/MapAggregateQuerySuite.scala ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+
+class MapAggregateQuerySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  private val table = "map_aggregate_test"
+  private val col1 = "col1"
+  private val col2 = "col2"
+  private def query(numBins: Int): DataFrame = {
+sql(s"SELECT map_aggregate($col1, $numBins), map_aggregate($col2, 
$numBins) FROM $table")
+  }
+
+  test("null handling") {
+withTempView(table) {
+  val schema = StructType(Seq(StructField(col1, StringType), 
StructField(col2, DoubleType)))
+  // Empty input row
+  val rdd1 = spark.sparkContext.parallelize(Seq(Row(null, null)))
+  spark.createDataFrame(rdd1, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(Map.empty, Map.empty))
+
+  // Add some non-empty row
+  val rdd2 = spark.sparkContext.parallelize(Seq(Row(null, 3.0D), 
Row("a", null)))
+  spark.createDataFrame(rdd2, schema).createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 2), Row(Map(("a", 1)), Map((3.0D, 1
+}
+  }
+
+  test("returns empty result when ndv exceeds numBins") {
+withTempView(table) {
+  spark.sparkContext.makeRDD(
+Seq(("a", 4), ("d", 2), ("c", 4), ("b", 1), ("a", 3), ("a", 2)), 
2).toDF(col1, col2)
+.createOrReplaceTempView(table)
+  checkAnswer(query(numBins = 4), Row(
+Map(("a", 3), ("b", 1), ("c", 1), ("d", 1)),
+Map((1.0D, 1), (2.0D, 2), (3.0D, 1), (4.0D, 2
+  // One partial exceeds numBins during update()
+  checkAnswer(query(numBins = 2), Row(Map.empty, Map.empty))
+  // Exceeding numBins during merge()
+  checkAnswer(query(numBins = 3), Row(Map.empty, Map.empty))
+}
+  }
+
+  test("multiple columns of different types") {
+def queryMultiColumns(numBins: Int): DataFrame = {
+  sql(
+s"""
+   |SELECT
+   |  map_aggregate(c1, $numBins),
+   |  map_aggregate(c2, $numBins),
+   |  map_aggregate(c3, $numBins),
+   |  map_aggregate(c4, $numBins),
+   |  map_aggregate(c5, $numBins),
+   |  map_aggregate(c6, $numBins),
+   |  map_aggregate(c7, $numBins),
+   |  map_aggregate(c8, $numBins),
+   |  map_aggregate(c9, $numBins),
+   |  map_aggregate(c10, $numBins)
+   |FROM $table
+""".stripMargin)
+}
+
+val ints = Seq(5, 3, 1)
+val doubles = Seq(1.0D, 3.0D, 5.0D)
+val dates = Seq("1970-01-01", "1970-02-02", "1970-03-03")
+val timestamps = Seq("1970-01-01 00:00:00", "1970-01-01 00:00:05", 
"1970-01-01 00:00:10")
+val strings = Seq("a", "bb", "ccc")
+
+val data = ints.indices.map { i =>
+  (ints(i).toByte,
+ints(i).toShort,
+ints(i),
+ints(i).toLong,
+doubles(i).toFloat,
+doubles(i),
+Decimal(doubles(i)),
+Date.valueOf(dates(i)),
+Timestamp.valueOf(timestamps(i)),
+strings(i))
+}
+withTempView(table) {
+  data.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", 
"c10")
--- End diff --

Can you use `def createDataFrame(rowRDD: RDD[Row], schema: StructType)`, 
instead of using implicit schema inference? 

We need an explicit schema definition when 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85645394
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns bins - (distinct value, 
frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, it returns an empty map.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns bins - (distinct value, frequency) 
pairs of equi-width
+  histogram when the number of distinct values is less than or equal 
to the specified
--- End diff --

Here, we need to document `distinct non-null values`. This function 
excludes NULL.

However, I am not sure whether this is expected. NULL is also a value, 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-28 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85619246
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HistogramEndpoints.scala
 ---
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription, Literal}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest,
 PercentileDigestSerializer}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
QuantileSummaries}
+import org.apache.spark.sql.catalyst.util.QuantileSummaries._
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The HistogramEndpoints function for a column returns bins - (distinct 
value, frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, for column of string type, 
it returns an empty
+ * map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+ * percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ * @param accuracyExpression Accuracy used in computing approximate 
percentiles.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins [, accuracy]) - Returns bins - (distinct value, 
frequency) pairs
+  of equi-width histogram when the number of distinct values is less 
than or equal to the
+  specified maximum number of bins. Otherwise, for column of string 
type, it returns an empty
+  map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+  percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0. The
+  `accuracy` parameter (default: 1) is a positive integer literal 
which controls percentiles
+  approximation accuracy at the cost of memory. Higher value of 
`accuracy` yields better
+  accuracy, `1.0/accuracy` is the relative error of the approximation.
+""")
+case class HistogramEndpoints(
+child: Expression,
+numBinsExpression: Expression,
+accuracyExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[EndpointsDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression, 
accuracyExpression: Expression) = {
+this(child, numBinsExpression, accuracyExpression, 0, 0)
+  }
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 
Literal(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  private lazy val percentages: Array[Double] = {
+val array = new Array[Double](numBins + 1)
+for (i <- 0 to numBins) {
+  array(i) = i / numBins.toDouble
+}
+array
+  }
+
+  private lazy val accuracy: Int = 
accuracyExpression.eval().asInstanceOf[Int]
+
+  override def 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-28 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85609917
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HistogramEndpoints.scala
 ---
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription, Literal}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest,
 PercentileDigestSerializer}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
QuantileSummaries}
+import org.apache.spark.sql.catalyst.util.QuantileSummaries._
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The HistogramEndpoints function for a column returns bins - (distinct 
value, frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, for column of string type, 
it returns an empty
+ * map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+ * percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ * @param accuracyExpression Accuracy used in computing approximate 
percentiles.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins [, accuracy]) - Returns bins - (distinct value, 
frequency) pairs
+  of equi-width histogram when the number of distinct values is less 
than or equal to the
+  specified maximum number of bins. Otherwise, for column of string 
type, it returns an empty
+  map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+  percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0. The
+  `accuracy` parameter (default: 1) is a positive integer literal 
which controls percentiles
+  approximation accuracy at the cost of memory. Higher value of 
`accuracy` yields better
+  accuracy, `1.0/accuracy` is the relative error of the approximation.
+""")
+case class HistogramEndpoints(
+child: Expression,
+numBinsExpression: Expression,
+accuracyExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[EndpointsDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression, 
accuracyExpression: Expression) = {
+this(child, numBinsExpression, accuracyExpression, 0, 0)
+  }
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 
Literal(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  private lazy val percentages: Array[Double] = {
+val array = new Array[Double](numBins + 1)
+for (i <- 0 to numBins) {
+  array(i) = i / numBins.toDouble
+}
+array
+  }
+
+  private lazy val accuracy: Int = 
accuracyExpression.eval().asInstanceOf[Int]
+
+  override def 

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-25 Thread wzhfy
GitHub user wzhfy opened a pull request:

https://github.com/apache/spark/pull/15637

[SPARK-18000] [SQL] Aggregation function for computing endpoints for 
histograms

## What changes were proposed in this pull request?

This function for a column returns bins - (distinct value, frequency) pairs
of equi-width histogram when the number of distinct values is less than or 
equal to the
specified maximum number of bins. Otherwise, for column of string type, it 
returns an empty
map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0.

## How was this patch tested?

add test cases

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wzhfy/spark histogramEndpoints

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15637.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15637


commit 1580a7fb0a6a21b603e754c744c8c6cb02fd24c2
Author: wangzhenhua 
Date:   2016-10-12T01:02:37Z

add agg function to generate string histogram

commit a3281606372f83eca960ea90734e8ee9cb1c3125
Author: wangzhenhua 
Date:   2016-10-21T08:10:01Z

comments

commit 907cd99b8b26ae3caa224df67cc10bc784f10fb4
Author: Zhenhua Wang 
Date:   2016-10-22T12:15:22Z

create HistogramEndpoints to generate endpoints for string and numeric types

commit 35e453cb1079398196ece4f13e8f294ee4e4e916
Author: Zhenhua Wang 
Date:   2016-10-23T15:05:09Z

change suite names

commit f6fe25de3f5f1382727cecfdda7b74e40758896b
Author: wangzhenhua 
Date:   2016-10-26T03:20:05Z

test cases and fix bugs

commit 15eb3721f56ac27bd90933ef7e66f3453eae4a75
Author: wangzhenhua 
Date:   2016-10-26T03:29:14Z

fix doc




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org