[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15544


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r140140577
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPl

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r140131809
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
--- End diff --

I'll check element type of `endpointsExpression`, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r140129058
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
--- End diff --

This aggregate function is only used internally, the expression will use 
internal types, so there is no case using date/timestamp endpoints.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139917384
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPlu

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139909208
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPl

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139898372
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPlu

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139896879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
--- End diff --

Good point. I think we should also allow date/timestamp endpoints.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139890936
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPlu

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139888528
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPl

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139887066
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPlu

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139886249
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
--- End diff --

Yes. I'll improve the comment for `endpointsExpression` as you suggested.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139878051
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlusPl

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139877802
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
--- End diff --

The type of `child` can be `TimestampType` and `DateType`, but endpoints 
can only be `ArrayType` of `NumericType`. It may not be convenient to set up 
numeric endpoints for a timestamp or date child column.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139877421
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
--- End diff --

Should we also check element type of `endpointsExpression`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139876729
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
--- End diff --

`An array expression with `NumericType` element to construct the intervals 
 Must be foldable.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139876548
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
--- End diff --

`endpointsExpression`  is foldable?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139859176
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
--- End diff --

oh, I'll remove this. Previously I put some other logic here, but we should 
remove it now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139713569
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
+  assert(left == right)
+}
+
+val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, 
ArrayType(IntegerType),
+  MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", 
IntegerType
+wrongColumnTypes.foreach { dataType =>
+  val wrongColumn = new ApproxCountDistinctForIntervals(
+AttributeReference("a", dataType)(),
+endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_
+  assert(
+wrongColumn.checkInputDataTypes() match {
+  case TypeCheckFailure(msg)
+if msg.contains("requires (numeric or timestamp or date) 
type") => true
+  case _ => false
+})
+}
+
+var wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = Literal(0.5d))
+assert(
+  wrongEndpoints.checkInputDataTypes() match {
+case TypeCheckFailure(msg) if msg.contains("requires array type") 
=> true
+case _ => false
+  })
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Seq(AttributeReference("b", 
DoubleType)(
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The intervals provided must be constant literals"))
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Array(10L).map(Literal(_
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals"))
+  }
+
+  /** Create an ApproxCountDistinctForIntervals instance and an input and 
output buffer. */
+  private def createEstimator(
+  endpoints: Array[Double],
+  rsd: Double = 0.05,
+  dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, 
InternalRow, InternalRow) = {
+val input = new SpecificInternalRow(Seq(dt))
+val aggFunc = ApproxCountDistinctForIntervals(
+  BoundReference(0, dt, nullable = true), 
CreateArray(endpoints.map(Literal(_))), rsd)
+val buffer = createBuffer(aggFunc)
+(aggFunc, input, buffer)
+  }
+
+  private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): 
InternalRow = {
+val buffer = new 
SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType))
+aggFunc.initialize(buffer)
+buffer
+  }
+
+  test("merging ApproxCountDistinctForIntervals instances") {
+val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 
2000, 345678, 100))
+val buffer1b = createBuffer(aggFunc)
+val buffer2 = createBuffer(aggFunc)
+
+// Add the lower half to `buffer1a`.
+var i = 0
+while (i < 50) {
+  input.setInt(0, i)
+  aggFunc.

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139713018
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
+  assert(left == right)
+}
+
+val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, 
ArrayType(IntegerType),
+  MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", 
IntegerType
+wrongColumnTypes.foreach { dataType =>
+  val wrongColumn = new ApproxCountDistinctForIntervals(
+AttributeReference("a", dataType)(),
+endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_
+  assert(
+wrongColumn.checkInputDataTypes() match {
+  case TypeCheckFailure(msg)
+if msg.contains("requires (numeric or timestamp or date) 
type") => true
+  case _ => false
+})
+}
+
+var wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = Literal(0.5d))
+assert(
+  wrongEndpoints.checkInputDataTypes() match {
+case TypeCheckFailure(msg) if msg.contains("requires array type") 
=> true
+case _ => false
+  })
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Seq(AttributeReference("b", 
DoubleType)(
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The intervals provided must be constant literals"))
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Array(10L).map(Literal(_
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals"))
+  }
+
+  /** Create an ApproxCountDistinctForIntervals instance and an input and 
output buffer. */
+  private def createEstimator(
+  endpoints: Array[Double],
+  rsd: Double = 0.05,
+  dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, 
InternalRow, InternalRow) = {
+val input = new SpecificInternalRow(Seq(dt))
+val aggFunc = ApproxCountDistinctForIntervals(
+  BoundReference(0, dt, nullable = true), 
CreateArray(endpoints.map(Literal(_))), rsd)
+val buffer = createBuffer(aggFunc)
+(aggFunc, input, buffer)
+  }
+
+  private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): 
InternalRow = {
+val buffer = new 
SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType))
+aggFunc.initialize(buffer)
+buffer
+  }
+
+  test("merging ApproxCountDistinctForIntervals instances") {
+val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 
2000, 345678, 100))
+val buffer1b = createBuffer(aggFunc)
+val buffer2 = createBuffer(aggFunc)
+
+// Add the lower half to `buffer1a`.
+var i = 0
+while (i < 50) {
+  input.setInt(0, i)
+  aggFunc.

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139710680
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
--- End diff --

why do we need this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139658798
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
+  assert(left == right)
+}
+
+val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, 
ArrayType(IntegerType),
+  MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", 
IntegerType
+wrongColumnTypes.foreach { dataType =>
+  val wrongColumn = new ApproxCountDistinctForIntervals(
+AttributeReference("a", dataType)(),
+endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_
+  assert(
+wrongColumn.checkInputDataTypes() match {
+  case TypeCheckFailure(msg)
+if msg.contains("requires (numeric or timestamp or date) 
type") => true
+  case _ => false
+})
+}
+
+var wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = Literal(0.5d))
+assert(
+  wrongEndpoints.checkInputDataTypes() match {
+case TypeCheckFailure(msg) if msg.contains("requires array type") 
=> true
+case _ => false
+  })
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Seq(AttributeReference("b", 
DoubleType)(
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The intervals provided must be constant literals"))
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Array(10L).map(Literal(_
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals"))
+  }
+
+  /** Create an ApproxCountDistinctForIntervals instance and an input and 
output buffer. */
+  private def createEstimator(
+  endpoints: Array[Double],
+  rsd: Double = 0.05,
+  dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, 
InternalRow, InternalRow) = {
+val input = new SpecificInternalRow(Seq(dt))
+val aggFunc = ApproxCountDistinctForIntervals(
+  BoundReference(0, dt, nullable = true), 
CreateArray(endpoints.map(Literal(_))), rsd)
+val buffer = createBuffer(aggFunc)
+(aggFunc, input, buffer)
+  }
+
+  private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): 
InternalRow = {
+val buffer = new 
SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType))
+aggFunc.initialize(buffer)
+buffer
+  }
+
+  test("merging ApproxCountDistinctForIntervals instances") {
+val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 
2000, 345678, 100))
+val buffer1b = createBuffer(aggFunc)
+val buffer2 = createBuffer(aggFunc)
+
+// Create the
+// Add the lower half
+var i = 0
+while (i < 50) {
+  input.setInt(0, i)
+  aggF

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139600676
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
+  assert(left == right)
+}
+
+val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, 
ArrayType(IntegerType),
+  MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", 
IntegerType
+wrongColumnTypes.foreach { dataType =>
+  val wrongColumn = new ApproxCountDistinctForIntervals(
+AttributeReference("a", dataType)(),
+endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_
+  assert(
+wrongColumn.checkInputDataTypes() match {
+  case TypeCheckFailure(msg)
+if msg.contains("requires (numeric or timestamp or date) 
type") => true
+  case _ => false
+})
+}
+
+var wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = Literal(0.5d))
+assert(
+  wrongEndpoints.checkInputDataTypes() match {
+case TypeCheckFailure(msg) if msg.contains("requires array type") 
=> true
+case _ => false
+  })
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Seq(AttributeReference("b", 
DoubleType)(
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The intervals provided must be constant literals"))
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Array(10L).map(Literal(_
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals"))
+  }
+
+  /** Create an ApproxCountDistinctForIntervals instance and an input and 
output buffer. */
+  private def createEstimator(
+  endpoints: Array[Double],
+  rsd: Double = 0.05,
+  dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, 
InternalRow, InternalRow) = {
+val input = new SpecificInternalRow(Seq(dt))
+val aggFunc = ApproxCountDistinctForIntervals(
+  BoundReference(0, dt, nullable = true), 
CreateArray(endpoints.map(Literal(_))), rsd)
+val buffer = createBuffer(aggFunc)
+(aggFunc, input, buffer)
+  }
+
+  private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): 
InternalRow = {
+val buffer = new 
SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType))
+aggFunc.initialize(buffer)
+buffer
+  }
+
+  test("merging ApproxCountDistinctForIntervals instances") {
+val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 
2000, 345678, 100))
+val buffer1b = createBuffer(aggFunc)
+val buffer2 = createBuffer(aggFunc)
+
+// Create the
+// Add the lower half
+var i = 0
+while (i < 50) {
+  input.setInt(0, i)
+  

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139600490
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
+  assert(left == right)
+}
+
+val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, 
ArrayType(IntegerType),
+  MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", 
IntegerType
+wrongColumnTypes.foreach { dataType =>
+  val wrongColumn = new ApproxCountDistinctForIntervals(
+AttributeReference("a", dataType)(),
+endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_
+  assert(
+wrongColumn.checkInputDataTypes() match {
+  case TypeCheckFailure(msg)
+if msg.contains("requires (numeric or timestamp or date) 
type") => true
+  case _ => false
+})
+}
+
+var wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = Literal(0.5d))
+assert(
+  wrongEndpoints.checkInputDataTypes() match {
+case TypeCheckFailure(msg) if msg.contains("requires array type") 
=> true
+case _ => false
+  })
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Seq(AttributeReference("b", 
DoubleType)(
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The intervals provided must be constant literals"))
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Array(10L).map(Literal(_
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals"))
+  }
+
+  /** Create an ApproxCountDistinctForIntervals instance and an input and 
output buffer. */
+  private def createEstimator(
+  endpoints: Array[Double],
+  rsd: Double = 0.05,
+  dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, 
InternalRow, InternalRow) = {
+val input = new SpecificInternalRow(Seq(dt))
+val aggFunc = ApproxCountDistinctForIntervals(
+  BoundReference(0, dt, nullable = true), 
CreateArray(endpoints.map(Literal(_))), rsd)
+val buffer = createBuffer(aggFunc)
+(aggFunc, input, buffer)
+  }
+
+  private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): 
InternalRow = {
+val buffer = new 
SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType))
+aggFunc.initialize(buffer)
+buffer
+  }
+
+  test("merging ApproxCountDistinctForIntervals instances") {
+val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 
2000, 345678, 100))
+val buffer1b = createBuffer(aggFunc)
+val buffer2 = createBuffer(aggFunc)
+
+// Create the
--- End diff --

typo?


---

-
To unsubs

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139599361
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new HyperLogLogPlu

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-17 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139327658
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression, ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints will be
+ * sorted into ascending order. To count ndv's in these intervals, apply 
the HyperLogLogPlusPlus
+ * algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns 
the approximate
+  number of distinct values (ndv) for intervals [endpoint_1, 
endpoint_2],
+  (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N].
+
+_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), 
relativeSD=0.05) - Returns
+  the approximate number of distinct values (ndv) for intervals with 
relativeSD, the maximum
+  estimation error allowed in the HyperLogLogPlusPlus algorithm.
+  """,
+  extended = """
+Examples:
+  > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 
0.01);
+   [1, 0]
+  """)
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] = {
+val doubleArray = (endpointsExpression.dataType, 
endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+util.Arrays.sort(doubleArray)
+doubleArray
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpoi

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139188790
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression, ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints will be
+ * sorted into ascending order. To count ndv's in these intervals, apply 
the HyperLogLogPlusPlus
+ * algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+@ExpressionDescription(
--- End diff --

we don't need this if it's an internal func


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139190161
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression, ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints will be
+ * sorted into ascending order. To count ndv's in these intervals, apply 
the HyperLogLogPlusPlus
+ * algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns 
the approximate
+  number of distinct values (ndv) for intervals [endpoint_1, 
endpoint_2],
+  (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N].
+
+_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), 
relativeSD=0.05) - Returns
+  the approximate number of distinct values (ndv) for intervals with 
relativeSD, the maximum
+  estimation error allowed in the HyperLogLogPlusPlus algorithm.
+  """,
+  extended = """
+Examples:
+  > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 
0.01);
+   [1, 0]
+  """)
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] = {
+val doubleArray = (endpointsExpression.dataType, 
endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+util.Arrays.sort(doubleArray)
--- End diff --

again, if it's only used internally, we can require the caller side to pass 
the endpoints sorted.


---

-
To unsubscribe, e-mail: reviews-unsu

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139190528
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression, ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints will be
+ * sorted into ascending order. To count ndv's in these intervals, apply 
the HyperLogLogPlusPlus
+ * algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns 
the approximate
+  number of distinct values (ndv) for intervals [endpoint_1, 
endpoint_2],
+  (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N].
+
+_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), 
relativeSD=0.05) - Returns
+  the approximate number of distinct values (ndv) for intervals with 
relativeSD, the maximum
+  estimation error allowed in the HyperLogLogPlusPlus algorithm.
+  """,
+  extended = """
+Examples:
+  > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 
0.01);
+   [1, 0]
+  """)
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] = {
+val doubleArray = (endpointsExpression.dataType, 
endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+util.Arrays.sort(doubleArray)
+doubleArray
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!en

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139188326
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -270,6 +270,7 @@ object FunctionRegistry {
 expression[Remainder]("%"),
 
 // aggregate functions
+
expression[ApproxCountDistinctForIntervals]("approx_count_distinct_for_intervals"),
--- End diff --

If it's only used internally, we don't need to register it here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-13 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r138779799
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.lang.{Long => JLong}
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.XxHash64Function
+import org.apache.spark.sql.types._
+
+// A helper class for HyperLogLogPlusPlus.
+class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable {
--- End diff --

Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-13 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r138541717
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.lang.{Long => JLong}
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.XxHash64Function
+import org.apache.spark.sql.types._
+
+// A helper class for HyperLogLogPlusPlus.
+class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable {
--- End diff --

Is this code mainly copied from `HyperLogLogPlusPlus`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-12 Thread wzhfy
GitHub user wzhfy reopened a pull request:

https://github.com/apache/spark/pull/15544

[SPARK-17997] [SQL] Add an aggregation function for counting distinct 
values for multiple intervals

## What changes were proposed in this pull request?

This work is a part of 
[SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to compute 
equi-height histograms. Equi-height histogram is an array of bins. A bin 
consists of two endpoints which form an interval of values and the ndv in that 
interval.

This PR creates a new aggregate function, given an array of endpoints, 
counting distinct values (ndv) in intervals among those endpoints.

This PR also refactors `HyperLogLogPlusPlus` by extracting a helper class 
`HyperLogLogPlusPlusHelper`, where the underlying HLLPP algorithm locates.

## How was this patch tested?

Add new test cases.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wzhfy/spark countIntervals

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15544.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15544


commit 9960fab07d2075d2beba1fea7024fe6dd30d9eef
Author: wangzhenhua 
Date:   2016-10-14T06:23:39Z

refactor hllpp

commit 5aa835ce2769a34f88bacb389c4af30f52459226
Author: wangzhenhua 
Date:   2016-10-17T13:18:36Z

add IntervalDistinctApprox

commit 840171efa08c70da83af54bc726079a88fb7a1d2
Author: wangzhenhua 
Date:   2016-10-19T01:58:32Z

add test cases

commit a6417e7df5cf44ba9f75a7d66d46258a56b0082f
Author: wangzhenhua 
Date:   2016-10-20T04:46:57Z

convert HLLPP and IntervalDistinctApprox to ImperativeAggregate

commit 74d7ae7ac817d427a264b67f580fe39bbb49811b
Author: wangzhenhua 
Date:   2016-11-04T08:36:23Z

add negative column type test and update doc




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2016-12-17 Thread wzhfy
Github user wzhfy closed the pull request at:

https://github.com/apache/spark/pull/15544


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2016-10-18 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r84006557
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
 ---
@@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus(
 
   override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
 
-  override def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
-
-  /** Allocate enough words to store all registers. */
-  override val aggBufferAttributes: Seq[AttributeReference] = 
Seq.tabulate(numWords) { i =>
-AttributeReference(s"MS[$i]", LongType)()
+  override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = {
+new HyperLogLogPlusPlusAlgo(relativeSD)
--- End diff --

@hvanhovell Do you mean we should not use `TypedImperativeAggregate` 
instead of `ImperativeAggregate`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2016-10-18 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r83997249
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
 ---
@@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus(
 
   override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
 
-  override def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
-
-  /** Allocate enough words to store all registers. */
-  override val aggBufferAttributes: Seq[AttributeReference] = 
Seq.tabulate(numWords) { i =>
-AttributeReference(s"MS[$i]", LongType)()
+  override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = {
+new HyperLogLogPlusPlusAlgo(relativeSD)
--- End diff --

@hvanhovell Do you mean we put some digest object (like how 
PercentileDigest does in ApproximatePercentile) in the buffer instead of 
putting HLL++ directly in it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2016-10-18 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r83993309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
 ---
@@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus(
 
   override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
 
-  override def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
-
-  /** Allocate enough words to store all registers. */
-  override val aggBufferAttributes: Seq[AttributeReference] = 
Seq.tabulate(numWords) { i =>
-AttributeReference(s"MS[$i]", LongType)()
+  override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = {
+new HyperLogLogPlusPlusAlgo(relativeSD)
--- End diff --

This, by definition, moves HLL++ operator to the sort based aggregation 
path. This will be a huge performance hit. Is it possible to separate the 
algorithm from the buffer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2016-10-18 Thread wzhfy
GitHub user wzhfy opened a pull request:

https://github.com/apache/spark/pull/15544

[SPARK-17997] [SQL] Add an aggregation function for counting distinct 
values for multiple intervals

## What changes were proposed in this pull request?
This work is a part of 
[SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to generate 
histogram statistics.
This work is to compute ndv's for bins in equi-height histograms. A bin 
consists of two endpoints which form an interval of values and the ndv in that 
interval. For computing histogram statistics, after getting the endpoints, we 
need an agg function to count distinct values in each interval.

This pr also refactors HyperLogLogPlusPlus by extracting a helper class 
HyperLogLogPlusPlusAlgo, where I put the real algorithm.

## How was this patch tested?
add test cases


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wzhfy/spark countIntervals

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15544.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15544


commit 0255f6cbf8f32bba223c479b27b35a9310e52658
Author: wangzhenhua 
Date:   2016-10-14T06:23:39Z

refactor hllpp

commit ebeb0349e1786b6d74706bbf33a335c32a6eda7d
Author: wangzhenhua 
Date:   2016-10-17T13:18:36Z

add IntervalDistinctApprox

commit e274ef22b96ca878df326675e28566cfff6b5088
Author: wangzhenhua 
Date:   2016-10-19T01:58:32Z

add test cases




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org