[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21061 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r201904049 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,322 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +/** + * Will become common base class for [[ArrayUnion]], ArrayIntersect, and ArrayExcept. + */ +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType.asInstanceOf[ArrayType]) +ArrayType(elementType, dataTypes.exists(_.containsNull)) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach { array => + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r201898932 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,322 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +/** + * Will become common base class for [[ArrayUnion]], ArrayIntersect, and ArrayExcept. + */ +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType.asInstanceOf[ArrayType]) +ArrayType(elementType, dataTypes.exists(_.containsNull)) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach { array => + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r201692534 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -463,14 +463,27 @@ private static UnsafeArrayData fromPrimitiveArray( final long[] data = new long[(int)totalSizeInLongs]; Platform.putLong(data, Platform.LONG_ARRAY_OFFSET, length); -Platform.copyMemory(arr, offset, data, - Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); +if (arr != null) { + Platform.copyMemory(arr, offset, data, +Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); +} UnsafeArrayData result = new UnsafeArrayData(); result.pointTo(data, Platform.LONG_ARRAY_OFFSET, (int)totalSizeInLongs * 8); return result; } + public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { +return fromPrimitiveArray(null, offset, length, elementSize); + } + + public static boolean canUseGenericArrayData(int elementSize, int length) { --- End diff -- I think both make sense. I will follow your suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r201688229 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,322 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +/** + * Will become common base class for [[ArrayUnion]], ArrayIntersect, and ArrayExcept. + */ +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType.asInstanceOf[ArrayType]) +ArrayType(elementType, dataTypes.exists(_.containsNull)) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach { array => + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r201620967 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,322 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +/** + * Will become common base class for [[ArrayUnion]], ArrayIntersect, and ArrayExcept. + */ +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType.asInstanceOf[ArrayType]) +ArrayType(elementType, dataTypes.exists(_.containsNull)) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach { array => + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r201618980 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -463,14 +463,27 @@ private static UnsafeArrayData fromPrimitiveArray( final long[] data = new long[(int)totalSizeInLongs]; Platform.putLong(data, Platform.LONG_ARRAY_OFFSET, length); -Platform.copyMemory(arr, offset, data, - Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); +if (arr != null) { + Platform.copyMemory(arr, offset, data, +Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); +} UnsafeArrayData result = new UnsafeArrayData(); result.pointTo(data, Platform.LONG_ARRAY_OFFSET, (int)totalSizeInLongs * 8); return result; } + public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { +return fromPrimitiveArray(null, offset, length, elementSize); + } + + public static boolean canUseGenericArrayData(int elementSize, int length) { --- End diff -- Ah, sorry, I've suggested this naming, but seems it is should be something like `shouldUseGenericArrayData`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r201109190 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200925450 --- Diff: python/pyspark/sql/functions.py --- @@ -2013,6 +2013,25 @@ def array_distinct(col): return Column(sc._jvm.functions.array_distinct(_to_java_column(col))) +@ignore_unicode_prefix +@since(2.4) +def array_union(col1, col2): +""" +Collection function: returns an array of the elements in the union of col1 and col2, --- End diff -- I will add the tests for duplication. Yes, this will de-duplicate. I think that it is the same behavior as Presto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200889057 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200889598 --- Diff: python/pyspark/sql/functions.py --- @@ -2013,6 +2013,25 @@ def array_distinct(col): return Column(sc._jvm.functions.array_distinct(_to_java_column(col))) +@ignore_unicode_prefix +@since(2.4) +def array_union(col1, col2): +""" +Collection function: returns an array of the elements in the union of col1 and col2, --- End diff -- After reading the code, seems it de-duplicates all elements from two arrays. Is this behavior the same as Presto? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200888320 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200887096 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200886228 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -450,7 +450,7 @@ public UnsafeArrayData copy() { return values; } - private static UnsafeArrayData fromPrimitiveArray( + public static UnsafeArrayData fromPrimitiveArray( Object arr, int offset, int length, int elementSize) { final long headerInBytes = calculateHeaderPortionInBytes(length); --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200885976 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -450,7 +450,7 @@ public UnsafeArrayData copy() { return values; } - private static UnsafeArrayData fromPrimitiveArray( + public static UnsafeArrayData fromPrimitiveArray( Object arr, int offset, int length, int elementSize) { final long headerInBytes = calculateHeaderPortionInBytes(length); --- End diff -- Is [this thread](https://github.com/apache/spark/pull/21061#discussion_r192520463) an answer to this question? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200884043 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -450,7 +450,7 @@ public UnsafeArrayData copy() { return values; } - private static UnsafeArrayData fromPrimitiveArray( + public static UnsafeArrayData fromPrimitiveArray( Object arr, int offset, int length, int elementSize) { final long headerInBytes = calculateHeaderPortionInBytes(length); --- End diff -- IBM Box? :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200883268 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -450,7 +450,7 @@ public UnsafeArrayData copy() { return values; } - private static UnsafeArrayData fromPrimitiveArray( + public static UnsafeArrayData fromPrimitiveArray( Object arr, int offset, int length, int elementSize) { final long headerInBytes = calculateHeaderPortionInBytes(length); --- End diff -- Is [this thread](https://ibm.ent.box.com/notes/303238366863) an answer to this question? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200875757 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -450,7 +450,7 @@ public UnsafeArrayData copy() { return values; } - private static UnsafeArrayData fromPrimitiveArray( + public static UnsafeArrayData fromPrimitiveArray( Object arr, int offset, int length, int elementSize) { final long headerInBytes = calculateHeaderPortionInBytes(length); --- End diff -- Is this logic extracted to `useGenericArrayData`? If so, can we re-use it by calling the method here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r189432089 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || --- End diff -- `containsNull` instead of `cn`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200878344 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +val size = if (!isLongType) hsInt.size else hsLong.size +if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(size) +} +if (array.isNullAt(i)) { + if (nullElementSize == 0) { +if (resultArray != null) { + resultArray.setNullAt(pos) +} +pos += 1 +nullElementSize = 1 + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1,
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200875456 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -463,14 +463,27 @@ private static UnsafeArrayData fromPrimitiveArray( final long[] data = new long[(int)totalSizeInLongs]; Platform.putLong(data, Platform.LONG_ARRAY_OFFSET, length); -Platform.copyMemory(arr, offset, data, - Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); +if (arr != null) { + Platform.copyMemory(arr, offset, data, +Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); +} UnsafeArrayData result = new UnsafeArrayData(); result.pointTo(data, Platform.LONG_ARRAY_OFFSET, (int)totalSizeInLongs * 8); return result; } + public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { +return fromPrimitiveArray(null, offset, length, elementSize); + } + + public static boolean useGenericArrayData(int elementSize, int length) { --- End diff -- nit: canUseGenericArrayData --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200875538 --- Diff: python/pyspark/sql/functions.py --- @@ -2013,6 +2013,25 @@ def array_distinct(col): return Column(sc._jvm.functions.array_distinct(_to_java_column(col))) +@ignore_unicode_prefix +@since(2.4) +def array_union(col1, col2): +""" +Collection function: returns an array of the elements in the union of col1 and col2, --- End diff -- If the array of col1 contains duplicate elements itself, what it does? de-duplicate them too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200876039 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { --- End diff -- Describe what `ArraySetLike` is intended for by adding comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200874571 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -1166,4 +1166,88 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(ArrayDistinct(c1), Seq[Seq[Int]](Seq[Int](5, 6), Seq[Int](2, 1))) checkEvaluation(ArrayDistinct(c2), Seq[Seq[Int]](null, Seq[Int](2, 1))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, containsNull = false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, containsNull = false)) +val a02 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType, containsNull = true)) +val a03 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType, containsNull = false)) +val a04 = Literal.create(Seq.empty[Int], ArrayType(IntegerType, containsNull = false)) +val a05 = Literal.create(Seq[Byte](1, 2, 3), ArrayType(ByteType, containsNull = false)) +val a06 = Literal.create(Seq[Byte](4, 2), ArrayType(ByteType, containsNull = false)) +val a07 = Literal.create(Seq[Short](1, 2, 3), ArrayType(ShortType, containsNull = false)) +val a08 = Literal.create(Seq[Short](4, 2), ArrayType(ShortType, containsNull = false)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, containsNull = false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, containsNull = false)) +val a12 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType, containsNull = true)) +val a13 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType, containsNull = false)) +val a14 = Literal.create(Seq.empty[Long], ArrayType(LongType, containsNull = false)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, containsNull = false)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, containsNull = false)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType, containsNull = true)) + +val a30 = Literal.create(Seq(null, null), ArrayType(IntegerType)) +val a31 = Literal.create(null, ArrayType(StringType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(1, 2, 3, 4))) --- End diff -- nit: we don't need to use `UnsafeArrayData` here. `Seq(1, 2, 3, 4)` should work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200874190 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var nullElementSize = 0 +var pos = 0 +Seq(array1, array2).foreach(array => { --- End diff -- nit: `foreach { array =>`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200874014 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,323 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } --- End diff -- ```scala override def dataType: DataType = { val dataTypes = children.map(_.dataType.asInstanceOf[ArrayType]) ArrayType(elementType, dataTypes.exists(_.containsNull)) } ``` should work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200572386 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType --- End diff -- Oh, you are right. I will follow #21704 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200541043 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200543679 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType --- End diff -- We have to consider `containsNull` for both children? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200541138 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200540949 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200540913 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { --- End diff -- nit: `foreach { array =>`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200541020 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197319579 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197235630 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197229189 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197229074 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197228219 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197004678 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197001921 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { --- End diff -- We can remove this now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197005444 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197003678 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { --- End diff -- We need to handle null values while counting the elements? --- - To unsubscribe, e-mail:
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197004881 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197005083 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197004822 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197004944 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r197004467 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,347 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalPrimitiveType( --- End diff -- This is only for Int/Long types? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r196674440 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,297 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals && !cn) { --- End diff -- Ah, I see. I will try it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r196671471 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,297 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals && !cn) { --- End diff -- Ah, are you suggesting whether all of the element does not have `null` in the first loop (e.g. counting the result array size) even when `containsNull = true`? If we can ensure no `null` element in the array with `containsNull = true`, we can avoid boxing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r196662595 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2355,3 +2355,297 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals && !cn) { --- End diff -- Why do we need to check `!cn`? Can't we avoid boxing if the arrays contain null? How about using `foundNullElement` as `array_distinct` is doing at #21050? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r196621059 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r196620839 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r196614601 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194527119 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194516045 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + --- End diff -- nit: `$length`, and we need a extra space after it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194523480 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( --- End diff -- We should move this to `ArrayUnion` companion object? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194520924 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194530787 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194520589 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194520120 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { --- End diff -- We need to switch
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r194517152 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,293 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { +// Use the same calculation in UnsafeArrayData.fromPrimitiveArray() +val headerInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(length) +val valueRegionInBytes = elementSize.toLong * length +val totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8 +totalSizeInLongs > Integer.MAX_VALUE / 8 + } + + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with ${length}" + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } + + def evalUnionContainsNull( + array1: ArrayData, + array2: ArrayData, + elementType: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val hs = new mutable.HashSet[Any] + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + val elem = array.get(i, elementType) + if (hs.add(elem)) { +if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throwUnionLengthOverflowException(arrayBuffer.length) +} +arrayBuffer += elem + } + i += 1 +} + }) + new GenericArrayData(arrayBuffer) +} else { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyIncludeNull = false + Seq(array1, array2).foreach(_.foreach(elementType, (_, elem) => { +var found = false +if (elem == null) { + if (alreadyIncludeNull) { +found = true + } else { +alreadyIncludeNull = true + } +} else { + // check elem is already stored in arrayBuffer or not? + var j = 0 + while (!found && j < arrayBuffer.size) { +val va = arrayBuffer(j) +if (va != null && ordering.equiv(va, elem)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (arrayBuffer.length > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throwUnionLengthOverflowException(arrayBuffer.length) + } + arrayBuffer += elem +} + })) + new GenericArrayData(arrayBuffer) +} + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + protected def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (!cn) { + elementType match { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192919335 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,302 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { --- End diff -- Ah, I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192546226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,302 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { --- End diff -- Although I tried it, I stopped reusing. This is because `UnsafeArrayData.fromPrimitiveArray()` also uses variables (e.g. `headerInBytes` and `valueRegionInBytes`) calculated in this method. I think that there is no typical way to return multiple values from a function. Thus, we can move this to `UnsafeArrayData`. But, it is not easy to reuse it. WDYT? ``` private static UnsafeArrayData fromPrimitiveArray( Object arr, int offset, int length, int elementSize) { final long headerInBytes = calculateHeaderPortionInBytes(length); final long valueRegionInBytes = elementSize * length; final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8; if (totalSizeInLongs > Integer.MAX_VALUE / 8) { throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " + "it's too big."); } final long[] data = new long[(int)totalSizeInLongs]; Platform.putLong(data, Platform.LONG_ARRAY_OFFSET, length); Platform.copyMemory(arr, offset, data, Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); UnsafeArrayData result = new UnsafeArrayData(); result.pointTo(data, Platform.LONG_ARRAY_OFFSET, (int)totalSizeInLongs * 8); return result; } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192520463 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,302 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { --- End diff -- Shall we move this to `UnsafeArrayData` and reuse it? Maybe the name should be modified to fit the case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192490355 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- Ah, I misunderstood. To accept `Integer.MAX_VALUE * 8` looks a future plan. Anyway, I will use the same calculation in `UnsafeArrayData.fromPrimitiveArray()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192479966 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- I'm just not sure the calculation satisfies the limit in `UnsafeArrayData.fromPrimitiveArray()`. I'd prefer to do the same calculation in it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192340073 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient private lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): OpenHashSet[Int] + def longEval(ary: ArrayData, hs2: OpenHashSet[Long]):
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192336364 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient private lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): OpenHashSet[Int] + def longEval(ary: ArrayData, hs2: OpenHashSet[Long]):
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192331296 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- As I wrote a comment, since `UnsafeArrayData.fromPrimitiveArray()` uses `long[]`, this method can accept up to `Integer.MAX_VALUE * 8` (8 means `sizeof(long)`) as total byte size. Of course, conservatively, we limit the length by up to `Integer.MAX_VALUE`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192330635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- `8` means of `sizeof(long)` in Java primitive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192254481 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient private lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): OpenHashSet[Int] + def longEval(ary: ArrayData, hs2: OpenHashSet[Long]):
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192252991 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- Maybe we should do the same calculation as in `UnsafeArrayData.fromPrimitiveArray()` to be clear what you are doing here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192250043 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) --- End diff -- We don't need `org.apache.spark.unsafe.array.` because `ByteArrayMethods` is already imported? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192251150 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length --- End diff -- We should use `LongType.defaultSize` instead of `8L` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192253086 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192249752 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length --- End diff -- We should `IntegerType.defaultSize` instead of `4L` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192254166 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient private lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): OpenHashSet[Int] + def longEval(ary: ArrayData, hs2: OpenHashSet[Long]):
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192250954 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- `Integer.MAX_VALUE`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192251238 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- `Integer.MAX_VALUE`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r186283042 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -19,13 +19,42 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Comparator import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, MapData, TypeUtils} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.{ByteArray, UTF8String} +import org.apache.spark.util.collection.OpenHashSet + +/** --- End diff -- I intentionally copy the implementation from #21028 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r183011714 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- Ah, I see. Maybe the purpose of the test is not what I thought. Seems like what I wanted is included in the latest updates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r183005260 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- The following error occurs. When I looked at other tests, it does not look strange. This is because `null` has no type information. ``` cannot resolve 'array_union(NULL, `b`)' due to data type mismatch: Element type in both arrays must be the same;; 'Project [array_union(null, b#118) AS array_union(a, b)#121] +- AnalysisBarrier +- Project [_1#114 AS a#117, _2#115 AS b#118] +- LocalRelation [_1#114, _2#115] org.apache.spark.sql.AnalysisException: cannot resolve 'array_union(NULL, `b`)' due to data type mismatch: Element type in both arrays must be the same;; 'Project [array_union(null, b#118) AS array_union(a, b)#121] +- AnalysisBarrier +- Project [_1#114 AS a#117, _2#115 AS b#118] +- LocalRelation [_1#114, _2#115] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182990028 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- Returning `null` sounds good, but what do you mean by "Since the following test throws an exception"? What exception is the test throwing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182988201 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- Ah, I see. Thanks. Your example returns `null`. Since the following test throws an exception, I think that it makes sense that your example returns `null`. WDYT? ``` val df8 = Seq((null, Array("a"))).toDF("a", "b") intercept[AnalysisException] { df8.select(array_union($"a", $"b")) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182973891 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- I'm not sure its reason (maybe because the element types are different?), but I meant something like: ``` checkEvaluation(ArrayUnion(a20, Literal.create(null, ArrayType(StringType))), ...?) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182971562 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- Umm, when only one of the arguments is `null`, unexpected `TreeNodeException` occurs. ``` checkEvaluation(ArrayUnion(a20, a30), Seq("b", "a", "c", null)) ``` ``` After applying rule org.apache.spark.sql.catalyst.optimizer.EliminateDistinct in batch Eliminate Distinct, the structural integrity of the plan is broken., tree: 'Project [array_union([b,a,c], [null,null]) AS Optimized(array_union([b,a,c], [null,null]))#71] +- OneRowRelation org.apache.spark.sql.catalyst.errors.package$TreeNodeException: After applying rule org.apache.spark.sql.catalyst.optimizer.EliminateDistinct in batch Eliminate Distinct, the structural integrity of the plan is broken., tree: 'Project [array_union([b,a,c], [null,null]) AS Optimized(array_union([b,a,c], [null,null]))#71] +- OneRowRelation at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:106) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper$class.checkEvaluationWithOptimization(ExpressionEvalHelper.scala:252) ... ``` --- - To unsubscribe, e-mail:
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182971159 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -505,3 +506,150 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 --- End diff -- sure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182690674 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -417,3 +418,156 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 + def typeId: Int + + def array1: Expression + def array2: Expression + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +val r = super.checkInputDataTypes() +if ((r == TypeCheckResult.TypeCheckSuccess) && + (array1.dataType.asInstanceOf[ArrayType].elementType != +array2.dataType.asInstanceOf[ArrayType].elementType)) { + TypeCheckResult.TypeCheckFailure("Element type in both arrays must be the same") +} else { + r +} + } + + override def dataType: DataType = array1.dataType + + private def elementType = dataType.asInstanceOf[ArrayType].elementType + private def cn1 = array1.dataType.asInstanceOf[ArrayType].containsNull + private def cn2 = array2.dataType.asInstanceOf[ArrayType].containsNull + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val ary1 = input1.asInstanceOf[ArrayData] +val ary2 = input2.asInstanceOf[ArrayData] + +if (!cn1 && !cn2) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + val hs = new OpenHashSet[Int] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getInt(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { --- End diff -- Thank you for your good suggestion. I will create a new abstract method for this part which will be overridden by each of three subclasses --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182690065 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- Good question. I cannot see such a test case in [Presto](https://github.com/prestodb/presto/blob/master/presto-main/src/test/java/com/facebook/presto/type/TestArrayOperators.java#L1078). Let me think. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182689118 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -505,3 +506,150 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 + def typeId: Int + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +val r = super.checkInputDataTypes() +if ((r == TypeCheckResult.TypeCheckSuccess) && + (left.dataType.asInstanceOf[ArrayType].elementType != +right.dataType.asInstanceOf[ArrayType].elementType)) { + TypeCheckResult.TypeCheckFailure("Element type in both arrays must be the same") +} else { + r +} + } + + override def dataType: DataType = left.dataType + + private def elementType = dataType.asInstanceOf[ArrayType].elementType + private def cn1 = left.dataType.asInstanceOf[ArrayType].containsNull + private def cn2 = right.dataType.asInstanceOf[ArrayType].containsNull + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val ary1 = input1.asInstanceOf[ArrayData] +val ary2 = input2.asInstanceOf[ArrayData] + +if (!cn1 && !cn2) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + val hs = new OpenHashSet[Int] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getInt(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { +hs.add(ary2.getInt(i)) +i += 1 + } + UnsafeArrayData.fromPrimitiveArray(hs.iterator.toArray) +case LongType => + // avoid boxing of primitive long array elements + val hs = new OpenHashSet[Long] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getLong(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { +hs.add(ary2.getLong(i)) +i += 1 + } + UnsafeArrayData.fromPrimitiveArray(hs.iterator.toArray) +case _ => + val hs = new OpenHashSet[Any] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.get(i, elementType)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { +hs.add(ary2.get(i, elementType)) +i += 1 + } + new GenericArrayData(hs.iterator.toArray) + } +} else { + ArraySetUtils.arrayUnion(ary1, ary2, elementType) +} + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val hs = ctx.freshName("hs") +val i = ctx.freshName("i") +val ArraySetUtils = "org.apache.spark.sql.catalyst.expressions.ArraySetUtils" +val genericArrayData = classOf[GenericArrayData].getName +val unsafeArrayData = classOf[UnsafeArrayData].getName +val openHashSet = classOf[OpenHashSet[_]].getName +val et = s"org.apache.spark.sql.types.DataTypes.$elementType" +val (postFix, classTag, getter, arrayBuilder, castType) = if (!cn1 && !cn2) { + val ptName = CodeGenerator.primitiveTypeName(elementType) + elementType match { +case ByteType | ShortType | IntegerType => + (s"$$mcI$$sp", s"scala.reflect.ClassTag$$.MODULE$$.$ptName()", s"get$ptName($i)", +s"$unsafeArrayData.fromPrimitiveArray", CodeGenerator.javaType(elementType)) +case LongType => + (s"$$mcJ$$sp", s"scala.reflect.ClassTag$$.MODULE$$.$ptName()", s"get$ptName($i)", +s"$unsafeArrayData.fromPrimitiveArray", "long") +case _ => + ("", s"scala.reflect.ClassTag$$.MODULE$$.Object()", s"get($i, $et)", +s"new $genericArrayData", "Object") + } +} else { + ("", "", "", "", "") +} + +nullSafeCodeGen(ctx, ev, (ary1, ary2) => { + if (classTag != "") { +s""" + |$openHashSet $hs = new $openHashSet$postFix($classTag); + |for (int $i = 0; $i < $ary1.numElements(); $i++) { + | $hs.add$postFix($ary1.$getter); + |} + |for (int $i = 0; $i < $ary2.numElements(); $i++) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182672237 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -505,3 +506,150 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 --- End diff -- This should be in `ArraySetUtils` object? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182671380 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -169,4 +169,45 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(Reverse(as7), null) checkEvaluation(Reverse(aa), Seq(Seq("e"), Seq("c", "d"), Seq("a", "b"))) } + + test("Array Union") { +val a00 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, false)) +val a01 = Literal.create(Seq(4, 2), ArrayType(IntegerType, false)) +val a02 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType)) +val a03 = Literal.create(Seq(1, 2, null, 4, 5), ArrayType(IntegerType)) +val a04 = Literal.create(Seq(-5, 4, -3, 2, -1), ArrayType(IntegerType)) +val a05 = Literal.create(Seq.empty[Int], ArrayType(IntegerType)) + +val a10 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType, false)) +val a11 = Literal.create(Seq(4L, 2L), ArrayType(LongType, false)) +val a12 = Literal.create(Seq(1L, 2L, 3L), ArrayType(LongType)) +val a13 = Literal.create(Seq(1L, 2L, null, 4L, 5L), ArrayType(LongType)) +val a14 = Literal.create(Seq(-5L, 4L, -3L, 2L, -1L), ArrayType(LongType)) +val a15 = Literal.create(Seq.empty[Long], ArrayType(LongType)) + +val a20 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType)) +val a21 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType)) +val a22 = Literal.create(Seq("b", null, "a", "g"), ArrayType(StringType)) +val a23 = Literal.create(Seq("b", "a", "c"), ArrayType(StringType, false)) +val a24 = Literal.create(Seq("c", "d", "a", "f"), ArrayType(StringType, false)) + +val a30 = Literal.create(Seq(null, null), ArrayType(NullType)) + +checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(4, 1, 3, 2))) +checkEvaluation(ArrayUnion(a01, a02), Seq(4, 2, 1, 3)) +checkEvaluation(ArrayUnion(a03, a04), Seq(1, 2, null, 4, 5, -5, -3, -1)) +checkEvaluation(ArrayUnion(a03, a05), Seq(1, 2, null, 4, 5)) + +checkEvaluation( + ArrayUnion(a10, a11), UnsafeArrayData.fromPrimitiveArray(Array(4L, 1L, 3L, 2L))) +checkEvaluation(ArrayUnion(a11, a12), Seq(4L, 2L, 1L, 3L)) +checkEvaluation(ArrayUnion(a13, a14), Seq(1L, 2L, null, 4L, 5L, -5L, -3L, -1L)) +checkEvaluation(ArrayUnion(a13, a15), Seq(1L, 2L, null, 4L, 5L)) + +checkEvaluation(ArrayUnion(a20, a21), Seq("b", "a", "c", "d", "f")) +checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g")) +checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f")) + +checkEvaluation(ArrayUnion(a30, a30), Seq(null)) --- End diff -- What if one of the two arguments is `null`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182663898 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -505,3 +506,150 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 + def typeId: Int + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +val r = super.checkInputDataTypes() +if ((r == TypeCheckResult.TypeCheckSuccess) && + (left.dataType.asInstanceOf[ArrayType].elementType != +right.dataType.asInstanceOf[ArrayType].elementType)) { + TypeCheckResult.TypeCheckFailure("Element type in both arrays must be the same") +} else { + r +} + } + + override def dataType: DataType = left.dataType + + private def elementType = dataType.asInstanceOf[ArrayType].elementType + private def cn1 = left.dataType.asInstanceOf[ArrayType].containsNull + private def cn2 = right.dataType.asInstanceOf[ArrayType].containsNull + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val ary1 = input1.asInstanceOf[ArrayData] +val ary2 = input2.asInstanceOf[ArrayData] + +if (!cn1 && !cn2) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + val hs = new OpenHashSet[Int] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getInt(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { +hs.add(ary2.getInt(i)) +i += 1 + } + UnsafeArrayData.fromPrimitiveArray(hs.iterator.toArray) +case LongType => + // avoid boxing of primitive long array elements + val hs = new OpenHashSet[Long] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getLong(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { +hs.add(ary2.getLong(i)) +i += 1 + } + UnsafeArrayData.fromPrimitiveArray(hs.iterator.toArray) +case _ => + val hs = new OpenHashSet[Any] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.get(i, elementType)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { +hs.add(ary2.get(i, elementType)) +i += 1 + } + new GenericArrayData(hs.iterator.toArray) + } +} else { + ArraySetUtils.arrayUnion(ary1, ary2, elementType) +} + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val hs = ctx.freshName("hs") +val i = ctx.freshName("i") +val ArraySetUtils = "org.apache.spark.sql.catalyst.expressions.ArraySetUtils" +val genericArrayData = classOf[GenericArrayData].getName +val unsafeArrayData = classOf[UnsafeArrayData].getName +val openHashSet = classOf[OpenHashSet[_]].getName +val et = s"org.apache.spark.sql.types.DataTypes.$elementType" +val (postFix, classTag, getter, arrayBuilder, castType) = if (!cn1 && !cn2) { + val ptName = CodeGenerator.primitiveTypeName(elementType) + elementType match { +case ByteType | ShortType | IntegerType => + (s"$$mcI$$sp", s"scala.reflect.ClassTag$$.MODULE$$.$ptName()", s"get$ptName($i)", +s"$unsafeArrayData.fromPrimitiveArray", CodeGenerator.javaType(elementType)) +case LongType => + (s"$$mcJ$$sp", s"scala.reflect.ClassTag$$.MODULE$$.$ptName()", s"get$ptName($i)", +s"$unsafeArrayData.fromPrimitiveArray", "long") +case _ => + ("", s"scala.reflect.ClassTag$$.MODULE$$.Object()", s"get($i, $et)", +s"new $genericArrayData", "Object") + } +} else { + ("", "", "", "", "") +} + +nullSafeCodeGen(ctx, ev, (ary1, ary2) => { + if (classTag != "") { +s""" + |$openHashSet $hs = new $openHashSet$postFix($classTag); + |for (int $i = 0; $i < $ary1.numElements(); $i++) { + | $hs.add$postFix($ary1.$getter); + |} + |for (int $i = 0; $i < $ary2.numElements(); $i++) { +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182674039 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -417,3 +418,156 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 + def typeId: Int + + def array1: Expression + def array2: Expression + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +val r = super.checkInputDataTypes() +if ((r == TypeCheckResult.TypeCheckSuccess) && + (array1.dataType.asInstanceOf[ArrayType].elementType != +array2.dataType.asInstanceOf[ArrayType].elementType)) { + TypeCheckResult.TypeCheckFailure("Element type in both arrays must be the same") +} else { + r +} + } + + override def dataType: DataType = array1.dataType + + private def elementType = dataType.asInstanceOf[ArrayType].elementType + private def cn1 = array1.dataType.asInstanceOf[ArrayType].containsNull + private def cn2 = array2.dataType.asInstanceOf[ArrayType].containsNull + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val ary1 = input1.asInstanceOf[ArrayData] +val ary2 = input2.asInstanceOf[ArrayData] + +if (!cn1 && !cn2) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + val hs = new OpenHashSet[Int] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getInt(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { --- End diff -- I'm not sure this abstraction is good or not. The final version seems complex because of a bunch of if-else. I'd rather introduce abstract methods for the difference and override them in the subclasses. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182532583 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -417,3 +418,156 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 + def typeId: Int + + def array1: Expression + def array2: Expression + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +val r = super.checkInputDataTypes() +if ((r == TypeCheckResult.TypeCheckSuccess) && + (array1.dataType.asInstanceOf[ArrayType].elementType != +array2.dataType.asInstanceOf[ArrayType].elementType)) { + TypeCheckResult.TypeCheckFailure("Element type in both arrays must be the same") +} else { + r +} + } + + override def dataType: DataType = array1.dataType + + private def elementType = dataType.asInstanceOf[ArrayType].elementType + private def cn1 = array1.dataType.asInstanceOf[ArrayType].containsNull + private def cn2 = array2.dataType.asInstanceOf[ArrayType].containsNull + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val ary1 = input1.asInstanceOf[ArrayData] +val ary2 = input2.asInstanceOf[ArrayData] + +if (!cn1 && !cn2) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + val hs = new OpenHashSet[Int] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getInt(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { --- End diff -- [Here](https://github.com/apache/spark/pull/21103/files#diff-9853dcf5ce3d2ac1e94d473197ff5768R510) is the final version of `ArraySetUtils` that supports three functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182484698 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -417,3 +418,156 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 + def typeId: Int + + def array1: Expression + def array2: Expression + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +val r = super.checkInputDataTypes() +if ((r == TypeCheckResult.TypeCheckSuccess) && + (array1.dataType.asInstanceOf[ArrayType].elementType != +array2.dataType.asInstanceOf[ArrayType].elementType)) { + TypeCheckResult.TypeCheckFailure("Element type in both arrays must be the same") +} else { + r +} + } + + override def dataType: DataType = array1.dataType + + private def elementType = dataType.asInstanceOf[ArrayType].elementType + private def cn1 = array1.dataType.asInstanceOf[ArrayType].containsNull + private def cn2 = array2.dataType.asInstanceOf[ArrayType].containsNull + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val ary1 = input1.asInstanceOf[ArrayData] +val ary2 = input2.asInstanceOf[ArrayData] + +if (!cn1 && !cn2) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + val hs = new OpenHashSet[Int] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getInt(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { --- End diff -- [Here](https://github.com/apache/spark/pull/21102/files#diff-9853dcf5ce3d2ac1e94d473197ff5768R510) is an instance of the usage of `ArraySetUtils`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r182200622 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -417,3 +418,156 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + +abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes { + val kindUnion = 1 + def typeId: Int + + def array1: Expression + def array2: Expression + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +val r = super.checkInputDataTypes() +if ((r == TypeCheckResult.TypeCheckSuccess) && + (array1.dataType.asInstanceOf[ArrayType].elementType != +array2.dataType.asInstanceOf[ArrayType].elementType)) { + TypeCheckResult.TypeCheckFailure("Element type in both arrays must be the same") +} else { + r +} + } + + override def dataType: DataType = array1.dataType + + private def elementType = dataType.asInstanceOf[ArrayType].elementType + private def cn1 = array1.dataType.asInstanceOf[ArrayType].containsNull + private def cn2 = array2.dataType.asInstanceOf[ArrayType].containsNull + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val ary1 = input1.asInstanceOf[ArrayData] +val ary2 = input2.asInstanceOf[ArrayData] + +if (!cn1 && !cn2) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + val hs = new OpenHashSet[Int] + var i = 0 + while (i < ary1.numElements()) { +hs.add(ary1.getInt(i)) +i += 1 + } + i = 0 + while (i < ary2.numElements()) { --- End diff -- We can also support `array_union` and `array_except` by changing this 2nd loop with small other changes. This is why we introduced `ArraySetUtils` in this PR. Other PRs will update `ArraySetUtils` appropriately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r181590066 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +288,80 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes with CodegenFallback { --- End diff -- I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r181524647 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +288,80 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes with CodegenFallback { --- End diff -- Wholestage codegen doesn't support `CodegenFallback`. So even this expression codegen has no performance advantage itself, it still can makes a difference because it breaks a query to non wholestage codegen and wholestage codegen parts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r181478560 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +288,80 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes with CodegenFallback { --- End diff -- Or, do we want to keep the chain length of the whole-stage codegen as possible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org