[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15544 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r140140577 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPl
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r140131809 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") --- End diff -- I'll check element type of `endpointsExpression`, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r140129058 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => --- End diff -- This aggregate function is only used internally, the expression will use internal types, so there is no case using date/timestamp endpoints. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139917384 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPlu
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139909208 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPl
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139898372 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPlu
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139896879 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => --- End diff -- Good point. I think we should also allow date/timestamp endpoints. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139890936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPlu
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139888528 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPl
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139887066 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPlu
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139886249 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, --- End diff -- Yes. I'll improve the comment for `endpointsExpression` as you suggested. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139878051 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlusPl
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139877802 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => --- End diff -- The type of `child` can be `TimestampType` and `DateType`, but endpoints can only be `ArrayType` of `NumericType`. It may not be convenient to set up numeric endpoints for a timestamp or date child column. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139877421 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") --- End diff -- Should we also check element type of `endpointsExpression`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139876729 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. --- End diff -- `An array expression with `NumericType` element to construct the intervals Must be foldable.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139876548 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, --- End diff -- `endpointsExpression` is foldable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139859176 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { --- End diff -- oh, I'll remove this. Previously I put some other logic here, but we should remove it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139713569 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { + assert(left == right) +} + +val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, ArrayType(IntegerType), + MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", IntegerType +wrongColumnTypes.foreach { dataType => + val wrongColumn = new ApproxCountDistinctForIntervals( +AttributeReference("a", dataType)(), +endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_ + assert( +wrongColumn.checkInputDataTypes() match { + case TypeCheckFailure(msg) +if msg.contains("requires (numeric or timestamp or date) type") => true + case _ => false +}) +} + +var wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = Literal(0.5d)) +assert( + wrongEndpoints.checkInputDataTypes() match { +case TypeCheckFailure(msg) if msg.contains("requires array type") => true +case _ => false + }) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Seq(AttributeReference("b", DoubleType)( +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The intervals provided must be constant literals")) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Array(10L).map(Literal(_ +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals")) + } + + /** Create an ApproxCountDistinctForIntervals instance and an input and output buffer. */ + private def createEstimator( + endpoints: Array[Double], + rsd: Double = 0.05, + dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, InternalRow, InternalRow) = { +val input = new SpecificInternalRow(Seq(dt)) +val aggFunc = ApproxCountDistinctForIntervals( + BoundReference(0, dt, nullable = true), CreateArray(endpoints.map(Literal(_))), rsd) +val buffer = createBuffer(aggFunc) +(aggFunc, input, buffer) + } + + private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): InternalRow = { +val buffer = new SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType)) +aggFunc.initialize(buffer) +buffer + } + + test("merging ApproxCountDistinctForIntervals instances") { +val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 2000, 345678, 100)) +val buffer1b = createBuffer(aggFunc) +val buffer2 = createBuffer(aggFunc) + +// Add the lower half to `buffer1a`. +var i = 0 +while (i < 50) { + input.setInt(0, i) + aggFunc.
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139713018 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { + assert(left == right) +} + +val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, ArrayType(IntegerType), + MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", IntegerType +wrongColumnTypes.foreach { dataType => + val wrongColumn = new ApproxCountDistinctForIntervals( +AttributeReference("a", dataType)(), +endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_ + assert( +wrongColumn.checkInputDataTypes() match { + case TypeCheckFailure(msg) +if msg.contains("requires (numeric or timestamp or date) type") => true + case _ => false +}) +} + +var wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = Literal(0.5d)) +assert( + wrongEndpoints.checkInputDataTypes() match { +case TypeCheckFailure(msg) if msg.contains("requires array type") => true +case _ => false + }) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Seq(AttributeReference("b", DoubleType)( +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The intervals provided must be constant literals")) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Array(10L).map(Literal(_ +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals")) + } + + /** Create an ApproxCountDistinctForIntervals instance and an input and output buffer. */ + private def createEstimator( + endpoints: Array[Double], + rsd: Double = 0.05, + dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, InternalRow, InternalRow) = { +val input = new SpecificInternalRow(Seq(dt)) +val aggFunc = ApproxCountDistinctForIntervals( + BoundReference(0, dt, nullable = true), CreateArray(endpoints.map(Literal(_))), rsd) +val buffer = createBuffer(aggFunc) +(aggFunc, input, buffer) + } + + private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): InternalRow = { +val buffer = new SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType)) +aggFunc.initialize(buffer) +buffer + } + + test("merging ApproxCountDistinctForIntervals instances") { +val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 2000, 345678, 100)) +val buffer1b = createBuffer(aggFunc) +val buffer2 = createBuffer(aggFunc) + +// Add the lower half to `buffer1a`. +var i = 0 +while (i < 50) { + input.setInt(0, i) + aggFunc.
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139710680 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { --- End diff -- why do we need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139658798 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { + assert(left == right) +} + +val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, ArrayType(IntegerType), + MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", IntegerType +wrongColumnTypes.foreach { dataType => + val wrongColumn = new ApproxCountDistinctForIntervals( +AttributeReference("a", dataType)(), +endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_ + assert( +wrongColumn.checkInputDataTypes() match { + case TypeCheckFailure(msg) +if msg.contains("requires (numeric or timestamp or date) type") => true + case _ => false +}) +} + +var wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = Literal(0.5d)) +assert( + wrongEndpoints.checkInputDataTypes() match { +case TypeCheckFailure(msg) if msg.contains("requires array type") => true +case _ => false + }) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Seq(AttributeReference("b", DoubleType)( +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The intervals provided must be constant literals")) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Array(10L).map(Literal(_ +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals")) + } + + /** Create an ApproxCountDistinctForIntervals instance and an input and output buffer. */ + private def createEstimator( + endpoints: Array[Double], + rsd: Double = 0.05, + dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, InternalRow, InternalRow) = { +val input = new SpecificInternalRow(Seq(dt)) +val aggFunc = ApproxCountDistinctForIntervals( + BoundReference(0, dt, nullable = true), CreateArray(endpoints.map(Literal(_))), rsd) +val buffer = createBuffer(aggFunc) +(aggFunc, input, buffer) + } + + private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): InternalRow = { +val buffer = new SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType)) +aggFunc.initialize(buffer) +buffer + } + + test("merging ApproxCountDistinctForIntervals instances") { +val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 2000, 345678, 100)) +val buffer1b = createBuffer(aggFunc) +val buffer2 = createBuffer(aggFunc) + +// Create the +// Add the lower half +var i = 0 +while (i < 50) { + input.setInt(0, i) + aggF
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139600676 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { + assert(left == right) +} + +val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, ArrayType(IntegerType), + MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", IntegerType +wrongColumnTypes.foreach { dataType => + val wrongColumn = new ApproxCountDistinctForIntervals( +AttributeReference("a", dataType)(), +endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_ + assert( +wrongColumn.checkInputDataTypes() match { + case TypeCheckFailure(msg) +if msg.contains("requires (numeric or timestamp or date) type") => true + case _ => false +}) +} + +var wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = Literal(0.5d)) +assert( + wrongEndpoints.checkInputDataTypes() match { +case TypeCheckFailure(msg) if msg.contains("requires array type") => true +case _ => false + }) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Seq(AttributeReference("b", DoubleType)( +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The intervals provided must be constant literals")) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Array(10L).map(Literal(_ +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals")) + } + + /** Create an ApproxCountDistinctForIntervals instance and an input and output buffer. */ + private def createEstimator( + endpoints: Array[Double], + rsd: Double = 0.05, + dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, InternalRow, InternalRow) = { +val input = new SpecificInternalRow(Seq(dt)) +val aggFunc = ApproxCountDistinctForIntervals( + BoundReference(0, dt, nullable = true), CreateArray(endpoints.map(Literal(_))), rsd) +val buffer = createBuffer(aggFunc) +(aggFunc, input, buffer) + } + + private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): InternalRow = { +val buffer = new SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType)) +aggFunc.initialize(buffer) +buffer + } + + test("merging ApproxCountDistinctForIntervals instances") { +val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 2000, 345678, 100)) +val buffer1b = createBuffer(aggFunc) +val buffer2 = createBuffer(aggFunc) + +// Create the +// Add the lower half +var i = 0 +while (i < 50) { + input.setInt(0, i) +
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139600490 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { + assert(left == right) +} + +val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, ArrayType(IntegerType), + MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", IntegerType +wrongColumnTypes.foreach { dataType => + val wrongColumn = new ApproxCountDistinctForIntervals( +AttributeReference("a", dataType)(), +endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_ + assert( +wrongColumn.checkInputDataTypes() match { + case TypeCheckFailure(msg) +if msg.contains("requires (numeric or timestamp or date) type") => true + case _ => false +}) +} + +var wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = Literal(0.5d)) +assert( + wrongEndpoints.checkInputDataTypes() match { +case TypeCheckFailure(msg) if msg.contains("requires array type") => true +case _ => false + }) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Seq(AttributeReference("b", DoubleType)( +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The intervals provided must be constant literals")) + +wrongEndpoints = new ApproxCountDistinctForIntervals( + AttributeReference("a", DoubleType)(), + endpointsExpression = CreateArray(Array(10L).map(Literal(_ +assertEqual( + wrongEndpoints.checkInputDataTypes(), + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals")) + } + + /** Create an ApproxCountDistinctForIntervals instance and an input and output buffer. */ + private def createEstimator( + endpoints: Array[Double], + rsd: Double = 0.05, + dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, InternalRow, InternalRow) = { +val input = new SpecificInternalRow(Seq(dt)) +val aggFunc = ApproxCountDistinctForIntervals( + BoundReference(0, dt, nullable = true), CreateArray(endpoints.map(Literal(_))), rsd) +val buffer = createBuffer(aggFunc) +(aggFunc, input, buffer) + } + + private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): InternalRow = { +val buffer = new SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType)) +aggFunc.initialize(buffer) +buffer + } + + test("merging ApproxCountDistinctForIntervals instances") { +val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 2000, 345678, 100)) +val buffer1b = createBuffer(aggFunc) +val buffer2 = createBuffer(aggFunc) + +// Create the --- End diff -- typo? --- - To unsubs
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139599361 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new HyperLogLogPlu
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139327658 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints will be + * sorted into ascending order. To count ndv's in these intervals, apply the HyperLogLogPlusPlus + * algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +@ExpressionDescription( + usage = """ +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns the approximate + number of distinct values (ndv) for intervals [endpoint_1, endpoint_2], + (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), relativeSD=0.05) - Returns + the approximate number of distinct values (ndv) for intervals with relativeSD, the maximum + estimation error allowed in the HyperLogLogPlusPlus algorithm. + """, + extended = """ +Examples: + > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 0.01); + [1, 0] + """) +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = { +val doubleArray = (endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} +util.Arrays.sort(doubleArray) +doubleArray + } + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpoi
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139188790 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints will be + * sorted into ascending order. To count ndv's in these intervals, apply the HyperLogLogPlusPlus + * algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +@ExpressionDescription( --- End diff -- we don't need this if it's an internal func --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139190161 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints will be + * sorted into ascending order. To count ndv's in these intervals, apply the HyperLogLogPlusPlus + * algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +@ExpressionDescription( + usage = """ +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns the approximate + number of distinct values (ndv) for intervals [endpoint_1, endpoint_2], + (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), relativeSD=0.05) - Returns + the approximate number of distinct values (ndv) for intervals with relativeSD, the maximum + estimation error allowed in the HyperLogLogPlusPlus algorithm. + """, + extended = """ +Examples: + > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 0.01); + [1, 0] + """) +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = { +val doubleArray = (endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} +util.Arrays.sort(doubleArray) --- End diff -- again, if it's only used internally, we can require the caller side to pass the endpoints sorted. --- - To unsubscribe, e-mail: reviews-unsu
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139190528 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints will be + * sorted into ascending order. To count ndv's in these intervals, apply the HyperLogLogPlusPlus + * algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +@ExpressionDescription( + usage = """ +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns the approximate + number of distinct values (ndv) for intervals [endpoint_1, endpoint_2], + (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), relativeSD=0.05) - Returns + the approximate number of distinct values (ndv) for intervals with relativeSD, the maximum + estimation error allowed in the HyperLogLogPlusPlus algorithm. + """, + extended = """ +Examples: + > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 0.01); + [1, 0] + """) +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = { +val doubleArray = (endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} +util.Arrays.sort(doubleArray) +doubleArray + } + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!en
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139188326 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -270,6 +270,7 @@ object FunctionRegistry { expression[Remainder]("%"), // aggregate functions + expression[ApproxCountDistinctForIntervals]("approx_count_distinct_for_intervals"), --- End diff -- If it's only used internally, we don't need to register it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r138779799 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.lang.{Long => JLong} +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.XxHash64Function +import org.apache.spark.sql.types._ + +// A helper class for HyperLogLogPlusPlus. +class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable { --- End diff -- Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r138541717 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.lang.{Long => JLong} +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.XxHash64Function +import org.apache.spark.sql.types._ + +// A helper class for HyperLogLogPlusPlus. +class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable { --- End diff -- Is this code mainly copied from `HyperLogLogPlusPlus`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
GitHub user wzhfy reopened a pull request: https://github.com/apache/spark/pull/15544 [SPARK-17997] [SQL] Add an aggregation function for counting distinct values for multiple intervals ## What changes were proposed in this pull request? This work is a part of [SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to compute equi-height histograms. Equi-height histogram is an array of bins. A bin consists of two endpoints which form an interval of values and the ndv in that interval. This PR creates a new aggregate function, given an array of endpoints, counting distinct values (ndv) in intervals among those endpoints. This PR also refactors `HyperLogLogPlusPlus` by extracting a helper class `HyperLogLogPlusPlusHelper`, where the underlying HLLPP algorithm locates. ## How was this patch tested? Add new test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wzhfy/spark countIntervals Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15544.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15544 commit 9960fab07d2075d2beba1fea7024fe6dd30d9eef Author: wangzhenhua Date: 2016-10-14T06:23:39Z refactor hllpp commit 5aa835ce2769a34f88bacb389c4af30f52459226 Author: wangzhenhua Date: 2016-10-17T13:18:36Z add IntervalDistinctApprox commit 840171efa08c70da83af54bc726079a88fb7a1d2 Author: wangzhenhua Date: 2016-10-19T01:58:32Z add test cases commit a6417e7df5cf44ba9f75a7d66d46258a56b0082f Author: wangzhenhua Date: 2016-10-20T04:46:57Z convert HLLPP and IntervalDistinctApprox to ImperativeAggregate commit 74d7ae7ac817d427a264b67f580fe39bbb49811b Author: wangzhenhua Date: 2016-11-04T08:36:23Z add negative column type test and update doc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy closed the pull request at: https://github.com/apache/spark/pull/15544 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r84006557 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala --- @@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus( override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) - override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) - - /** Allocate enough words to store all registers. */ - override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(numWords) { i => -AttributeReference(s"MS[$i]", LongType)() + override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = { +new HyperLogLogPlusPlusAlgo(relativeSD) --- End diff -- @hvanhovell Do you mean we should not use `TypedImperativeAggregate` instead of `ImperativeAggregate`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r83997249 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala --- @@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus( override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) - override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) - - /** Allocate enough words to store all registers. */ - override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(numWords) { i => -AttributeReference(s"MS[$i]", LongType)() + override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = { +new HyperLogLogPlusPlusAlgo(relativeSD) --- End diff -- @hvanhovell Do you mean we put some digest object (like how PercentileDigest does in ApproximatePercentile) in the buffer instead of putting HLL++ directly in it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r83993309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala --- @@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus( override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) - override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) - - /** Allocate enough words to store all registers. */ - override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(numWords) { i => -AttributeReference(s"MS[$i]", LongType)() + override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = { +new HyperLogLogPlusPlusAlgo(relativeSD) --- End diff -- This, by definition, moves HLL++ operator to the sort based aggregation path. This will be a huge performance hit. Is it possible to separate the algorithm from the buffer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
GitHub user wzhfy opened a pull request: https://github.com/apache/spark/pull/15544 [SPARK-17997] [SQL] Add an aggregation function for counting distinct values for multiple intervals ## What changes were proposed in this pull request? This work is a part of [SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to generate histogram statistics. This work is to compute ndv's for bins in equi-height histograms. A bin consists of two endpoints which form an interval of values and the ndv in that interval. For computing histogram statistics, after getting the endpoints, we need an agg function to count distinct values in each interval. This pr also refactors HyperLogLogPlusPlus by extracting a helper class HyperLogLogPlusPlusAlgo, where I put the real algorithm. ## How was this patch tested? add test cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/wzhfy/spark countIntervals Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15544.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15544 commit 0255f6cbf8f32bba223c479b27b35a9310e52658 Author: wangzhenhua Date: 2016-10-14T06:23:39Z refactor hllpp commit ebeb0349e1786b6d74706bbf33a335c32a6eda7d Author: wangzhenhua Date: 2016-10-17T13:18:36Z add IntervalDistinctApprox commit e274ef22b96ca878df326675e28566cfff6b5088 Author: wangzhenhua Date: 2016-10-19T01:58:32Z add test cases --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org