[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke closed the pull request at: https://github.com/apache/spark/pull/21121 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183991875 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,157 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Transforms an array by assigning an order number to each element. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst, startFromZero]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",1),("a",2),(null,3),("b",4)] + > SELECT _FUNC_(array("d", "a", null, "b"), true, false); + [(1,"d"),(2,"a"),(3,null),(4,"b")] + > SELECT _FUNC_(array("d", "a", null, "b"), true, true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression, startFromZero: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral, Literal.FalseLiteral) + + def exprToFlag(e: Expression, order: String): Boolean = e match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException(s"The $order argument has to be a boolean constant.") + } + + private val idxFirst: Boolean = exprToFlag(indexFirst, "second") + + private val (idxShift, idxGen): (Int, String) = if (exprToFlag(startFromZero, "third")) { +(0, "z") + } else { +(1, "z + 1") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i + idxShift)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + val numElements = ctx.freshName("numElements") + val code = if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value, numElements) + } else { +genCodeForAnyElements(ctx, c, ev.value, numElements) + } + s""" + |final int $numElements = $c.numElements(); + |$code + """.stripMargin +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String, + numElements: String): String = { +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183948093 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $numElements = $childVariableName.numElements(); + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) { --- End diff -- nvm, this is `zip` that does not involve concat of multiple arrays. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183947619 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,157 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Transforms an array by assigning an order number to each element. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst, startFromZero]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",1),("a",2),(null,3),("b",4)] + > SELECT _FUNC_(array("d", "a", null, "b"), true, false); + [(1,"d"),(2,"a"),(3,null),(4,"b")] + > SELECT _FUNC_(array("d", "a", null, "b"), true, true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression, startFromZero: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral, Literal.FalseLiteral) + + def exprToFlag(e: Expression, order: String): Boolean = e match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException(s"The $order argument has to be a boolean constant.") + } + + private val idxFirst: Boolean = exprToFlag(indexFirst, "second") + + private val (idxShift, idxGen): (Int, String) = if (exprToFlag(startFromZero, "third")) { +(0, "z") + } else { +(1, "z + 1") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i + idxShift)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + val numElements = ctx.freshName("numElements") + val code = if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value, numElements) + } else { +genCodeForAnyElements(ctx, c, ev.value, numElements) + } + s""" + |final int $numElements = $c.numElements(); + |$code + """.stripMargin +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String, + numElements: String): String = { +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183808810 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $numElements = $childVariableName.numElements(); + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) { --- End diff -- Ah, we can alleviate this limitation ( up to `MAX_ARRAY_LENGTHMAX_ARRAY_LENGTH elements) if we use `GenericArrayData`. BTW, we have to do the same check in `genCodeForNonPrimitiveElements`, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183253723 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $numElements = $childVariableName.numElements(); + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) { --- End diff -- I like your suggestion. So instead of throwing the exception, the function will execute a similar piece of code as in `genCodeForNonPrimitiveElements`... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183253226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", --- End diff -- That's really good question! The newly added functions `element_at` and `array_position` are 1-based. But on the other handed, the `getItem` from the `Column` class is 0-based. What about adding one extra parameter and let users decide whether the array will indexed from 0 or 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183252854 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. --- End diff -- Good spot. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183237327 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $numElements = $childVariableName.numElements(); + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) { --- End diff -- Btw, if we use `GenericArrayData` as output array, can't we avoid this limit? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183237160 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" --- End diff -- Ah, I see. You just use unsafe-backed array as output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183237073 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" --- End diff -- Are we sure the input is always unsafe-backed array? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183236405 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", --- End diff -- Should the index be 0-based or 1-based? Other array functions seems to be 1-based. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183236358 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. --- End diff -- Wrong doc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183220685 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,139 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183214860 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,139 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + val indexFirstValue: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (indexFirstValue) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (indexFirstValue) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val valuePosition = if (indexFirstValue) "1" else "0" +val indexPosition = if (indexFirstValue) "0" else "1" --- End diff -- nit: How about `val (valuePosition, indexPosition) = if (indexFirstValue) ("1", "0") else ("0", "1")`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183214185 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,139 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") --- End diff -- nit: `// scalastyle:on line.size.limit` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183214315 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3340,6 +3340,17 @@ object functions { */ def reverse(e: Column): Column = withExpr { Reverse(e.expr) } + /** + * Transforms the input array by encapsulating elements into pairs + * with indexes indicating the order. + * + * @group collection_funcs + * @since 2.4.0 + */ + def zip_with_index(e: Column, indexFirst: Boolean = false): Column = withExpr { --- End diff -- Let's avoid using a default value in APIs. It doesn't work in Java. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183214167 --- Diff: python/pyspark/sql/functions.py --- @@ -2191,6 +2191,24 @@ def reverse(col): return Column(sc._jvm.functions.reverse(_to_java_column(col))) +@since(2.4) +def zip_with_index(col, indexFirst=False): +""" +Collection function: transforms the input array by encapsulating elements into pairs +with indexes indicating the order. + +:param col: name of column or expression + +>>> df = spark.createDataFrame([([2, 5, 3],), ([],)], ['data']) +>>> df.select(zip_with_index(df.data).alias('r')).collect() +[Row(r=[[value=2, index=0], [value=5, index=1], [value=3, index=2]]), Row(r=[])] +>>> df.select(zip_with_index(df.data, indexFirst=True).alias('r')).collect() +[Row(r=[[index=0, value=2], [index=1, value=5], [index=2, value=3]]), Row(r=[])] + """ --- End diff -- nit: there's one more leading space here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/21121 [SPARK-24042][SQL] Collection function: zip_with_index ## What changes were proposed in this pull request? Implement function zip_with_index(array[, indexFirst]) that transforms the input array by encapsulating elements into pairs with indexes indicating the order. ``` zip_with_index(array("d", "a", null, "b")) => [("d",0),("a",1),(null,2),("b",3)] zip_with_index(array("d", "a", null, "b"), true) => [(0,"d"),(1,"a"),(2,null),(3,"b")] ``` ## How was this patch tested? New tests added into: - CollectionExpressionSuite - DataFrameFunctionsSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/AbsaOSS/spark feature/array-api-zip_with_index-to-master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21121.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21121 commit 9f090309b8d13e37efaf7824b6d960a6f61ca79f Author: mn-mikke Date: 2018-04-18T08:00:27Z [SPARK-24042][SQL] Collection function: zip_with_index --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org