Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/20938#discussion_r182024380
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
---
@@ -417,3 +419,179 @@ case class ArrayMax(child: Expression) extends
UnaryExpression with ImplicitCast
override def prettyName: String = "array_max"
}
+
+/**
+ * Transforms an array of arrays into a single array.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(arrayOfArrays) - Transforms an array of arrays into a
single array.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(array(1, 2), array(3, 4));
+ [1,2,3,4]
+ """,
+ since = "2.4.0")
+case class Flatten(child: Expression) extends UnaryExpression {
+
+ private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+ private lazy val childDataType: ArrayType =
child.dataType.asInstanceOf[ArrayType]
+
+ override def nullable: Boolean = child.nullable ||
childDataType.containsNull
+
+ override def dataType: DataType = childDataType.elementType
+
+ lazy val elementType: DataType =
dataType.asInstanceOf[ArrayType].elementType
+
+ override def checkInputDataTypes(): TypeCheckResult = child.dataType
match {
+ case ArrayType(_: ArrayType, _) =>
+ TypeCheckResult.TypeCheckSuccess
+ case _ =>
+ TypeCheckResult.TypeCheckFailure(
+ s"The argument should be an array of arrays, " +
+ s"but '${child.sql}' is of ${child.dataType.simpleString} type."
+ )
+ }
+
+ override def nullSafeEval(child: Any): Any = {
+ val elements = child.asInstanceOf[ArrayData].toObjectArray(dataType)
+
+ if (elements.contains(null)) {
+ null
+ } else {
+ val arrayData = elements.map(_.asInstanceOf[ArrayData])
+ val numberOfElements = arrayData.foldLeft(0L)((sum, e) => sum +
e.numElements())
+ if (numberOfElements > MAX_ARRAY_LENGTH) {
+ throw new RuntimeException("Unsuccessful try to flatten an array
of arrays with " +
+ s" $numberOfElements elements due to exceeding the array size
limit $MAX_ARRAY_LENGTH.")
+ }
+ val flattenedData = new Array(numberOfElements.toInt)
+ var position = 0
+ for (ad <- arrayData) {
+ val arr = ad.toObjectArray(elementType)
+ Array.copy(arr, 0, flattenedData, position, arr.length)
+ position += arr.length
+ }
+ new GenericArrayData(flattenedData)
+ }
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ nullSafeCodeGen(ctx, ev, c => {
+ val code = if (CodeGenerator.isPrimitiveType(elementType)) {
+ genCodeForFlattenOfPrimitiveElements(ctx, c, ev.value)
+ } else {
+ genCodeForFlattenOfNonPrimitiveElements(ctx, c, ev.value)
+ }
+ nullElementsProtection(ev, c, code)
+ })
+ }
+
+ private def nullElementsProtection(
+ ev: ExprCode,
+ childVariableName: String,
+ coreLogic: String): String = {
+ s"""
+ |for (int z=0; !${ev.isNull} && z < $childVariableName.numElements();
z++) {
+ | ${ev.isNull} |= $childVariableName.isNullAt(z);
+ |}
+ |if (!${ev.isNull}) {
+ | $coreLogic
+ |}
+ """.stripMargin
+ }
+
+ private def genCodeForNumberOfElements(
+ ctx: CodegenContext,
+ childVariableName: String) : (String, String) = {
+ val variableName = ctx.freshName("numElements")
+ val code = s"""
+ |long $variableName = 0;
+ |for (int z=0; z < $childVariableName.numElements(); z++) {
+ | $variableName += $childVariableName.getArray(z).numElements();
+ |}
+ |if ($variableName > ${MAX_ARRAY_LENGTH}) {
+ | throw new RuntimeException("Unsuccessful try to flatten an array
of arrays with" +
+ | " $variableName elements due to exceeding the array size limit
$MAX_ARRAY_LENGTH.");
+ |}
+ """.stripMargin
+ (code, variableName)
+ }
+
+ private def genCodeForFlattenOfPrimitiveElements(
+ ctx: CodegenContext,
+ childVariableName: String,
+ arrayDataName: String): String = {
+ val arrayName = ctx.freshName("array")
+ val arraySizeName = ctx.freshName("size")
+ val counter = ctx.freshName("counter")
+ val tempArrayDataName = ctx.freshName("tempArrayData")
+
+ val (numElemCode, numElemName) = genCodeForNumberOfElements(ctx,
childVariableName)
+
+ val unsafeArraySizeInBytes = s"""
+ |long $arraySizeName =
UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
+ | $numElemName,
+ | ${elementType.defaultSize});
+ |if ($arraySizeName > $MAX_ARRAY_LENGTH) {
+ | throw new RuntimeException("Unsuccessful try to flatten an array
of arrays with" +
+ | " $arraySizeName bytes of data due to exceeding the limit
$MAX_ARRAY_LENGTH" +
--- End diff --
ditto.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]