[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-08-09 Thread mn-mikke
Github user mn-mikke closed the pull request at:

https://github.com/apache/spark/pull/21121


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-25 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183991875
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,157 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Transforms an array by assigning an order number to each element.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst, startFromZero]) - Transforms the 
input array by encapsulating elements into pairs with indexes indicating the 
order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",1),("a",2),(null,3),("b",4)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true, false);
+   [(1,"d"),(2,"a"),(3,null),(4,"b")]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true, true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression, 
startFromZero: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral, 
Literal.FalseLiteral)
+
+  def exprToFlag(e: Expression, order: String): Boolean = e match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException(s"The $order argument has to be 
a boolean constant.")
+  }
+
+  private val idxFirst: Boolean = exprToFlag(indexFirst, "second")
+
+  private val (idxShift, idxGen): (Int, String) = if 
(exprToFlag(startFromZero, "third")) {
+(0, "z")
+  } else {
+(1, "z + 1")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i 
+ idxShift)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  val numElements = ctx.freshName("numElements")
+  val code = if 
(CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value, numElements)
+  } else {
+genCodeForAnyElements(ctx, c, ev.value, numElements)
+  }
+  s"""
+ |final int $numElements = $c.numElements();
+ |$code
+   """.stripMargin
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String,
+  numElements: String): String = {
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else 
("0", "1")
+
+s"""
+   |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) 
+ longSize * 2};
+   |final long $byteArraySize = $calculateArraySize($numElements, 
$longSize + $structSize);
+   |final int $structsOffset = $calculateHeader($numElements) + 
$numElements * $longSize;
+   |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
 

[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183948093
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  private val idxFirst: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else 
("0", "1")
+
+s"""
+   |final int $numElements = $childVariableName.numElements();
+   |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) 
+ longSize * 2};
+   |final long $byteArraySize = $calculateArraySize($numElements, 
$longSize + $structSize);
+   |final int $structsOffset = $calculateHeader($numElements) + 
$numElements * $longSize;
+   |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
--- End diff --

nvm, this is `zip` that does not involve concat of multiple arrays.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183947619
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,157 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Transforms an array by assigning an order number to each element.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst, startFromZero]) - Transforms the 
input array by encapsulating elements into pairs with indexes indicating the 
order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",1),("a",2),(null,3),("b",4)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true, false);
+   [(1,"d"),(2,"a"),(3,null),(4,"b")]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true, true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression, 
startFromZero: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral, 
Literal.FalseLiteral)
+
+  def exprToFlag(e: Expression, order: String): Boolean = e match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException(s"The $order argument has to be 
a boolean constant.")
+  }
+
+  private val idxFirst: Boolean = exprToFlag(indexFirst, "second")
+
+  private val (idxShift, idxGen): (Int, String) = if 
(exprToFlag(startFromZero, "third")) {
+(0, "z")
+  } else {
+(1, "z + 1")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i 
+ idxShift)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  val numElements = ctx.freshName("numElements")
+  val code = if 
(CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value, numElements)
+  } else {
+genCodeForAnyElements(ctx, c, ev.value, numElements)
+  }
+  s"""
+ |final int $numElements = $c.numElements();
+ |$code
+   """.stripMargin
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String,
+  numElements: String): String = {
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else 
("0", "1")
+
+s"""
+   |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) 
+ longSize * 2};
+   |final long $byteArraySize = $calculateArraySize($numElements, 
$longSize + $structSize);
+   |final int $structsOffset = $calculateHeader($numElements) + 
$numElements * $longSize;
+   |if ($byteArraySize > $MAX_ARRAY_LENGTH) {

[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183808810
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  private val idxFirst: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else 
("0", "1")
+
+s"""
+   |final int $numElements = $childVariableName.numElements();
+   |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) 
+ longSize * 2};
+   |final long $byteArraySize = $calculateArraySize($numElements, 
$longSize + $structSize);
+   |final int $structsOffset = $calculateHeader($numElements) + 
$numElements * $longSize;
+   |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
--- End diff --

Ah, we can alleviate this limitation ( up to 
`MAX_ARRAY_LENGTHMAX_ARRAY_LENGTH elements) if we use `GenericArrayData`. BTW, 
we have to do the same check in `genCodeForNonPrimitiveElements`, too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183253723
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  private val idxFirst: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else 
("0", "1")
+
+s"""
+   |final int $numElements = $childVariableName.numElements();
+   |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) 
+ longSize * 2};
+   |final long $byteArraySize = $calculateArraySize($numElements, 
$longSize + $structSize);
+   |final int $structsOffset = $calculateHeader($numElements) + 
$numElements * $longSize;
+   |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
--- End diff --

I like your suggestion. So instead of throwing the exception, the function 
will execute a similar piece of code as in `genCodeForNonPrimitiveElements`...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183253226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
--- End diff --

That's really good question! The newly added functions `element_at` and 
`array_position` are 1-based. But on the other handed, the `getItem` from the 
`Column` class is 0-based. What about adding one extra parameter and let users 
decide whether the array will indexed from 0 or 1. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183252854
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
--- End diff --

Good spot. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183237327
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  private val idxFirst: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else 
("0", "1")
+
+s"""
+   |final int $numElements = $childVariableName.numElements();
+   |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) 
+ longSize * 2};
+   |final long $byteArraySize = $calculateArraySize($numElements, 
$longSize + $structSize);
+   |final int $structsOffset = $calculateHeader($numElements) + 
$numElements * $longSize;
+   |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
--- End diff --

Btw, if we use `GenericArrayData` as output array, can't we avoid this 
limit?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183237160
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  private val idxFirst: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
--- End diff --

Ah, I see. You just use unsafe-backed array as output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183237073
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  private val idxFirst: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
--- End diff --

Are we sure the input is always unsafe-backed array?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183236405
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
--- End diff --

Should the index be 0-based or 1-based? Other array functions seems to be 
1-based.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183236358
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
--- End diff --

Wrong doc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-21 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183220685
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,139 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-21 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183214860
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,139 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  val indexFirstValue: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (indexFirstValue) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (indexFirstValue) 
InternalRow(i, v) else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val valuePosition = if (indexFirstValue) "1" else "0"
+val indexPosition = if (indexFirstValue) "0" else "1"
--- End diff --

nit: How about `val (valuePosition, indexPosition) = if (indexFirstValue) 
("1", "0") else ("0", "1")`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-21 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183214185
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,139 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
--- End diff --

nit: `// scalastyle:on line.size.limit`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-21 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183214315
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3340,6 +3340,17 @@ object functions {
*/
   def reverse(e: Column): Column = withExpr { Reverse(e.expr) }
 
+  /**
+   * Transforms the input array by encapsulating elements into pairs
+   * with indexes indicating the order.
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def zip_with_index(e: Column, indexFirst: Boolean = false): Column = 
withExpr {
--- End diff --

Let's avoid using a default value in APIs. It doesn't work in Java.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-21 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183214167
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2191,6 +2191,24 @@ def reverse(col):
 return Column(sc._jvm.functions.reverse(_to_java_column(col)))
 
 
+@since(2.4)
+def zip_with_index(col, indexFirst=False):
+"""
+Collection function: transforms the input array by encapsulating 
elements into pairs
+with indexes indicating the order.
+
+:param col: name of column or expression
+
+>>> df = spark.createDataFrame([([2, 5, 3],), ([],)], ['data'])
+>>> df.select(zip_with_index(df.data).alias('r')).collect()
+[Row(r=[[value=2, index=0], [value=5, index=1], [value=3, index=2]]), 
Row(r=[])]
+>>> df.select(zip_with_index(df.data, 
indexFirst=True).alias('r')).collect()
+[Row(r=[[index=0, value=2], [index=1, value=5], [index=2, value=3]]), 
Row(r=[])]
+ """
--- End diff --

nit: there's one more leading space here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-21 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/21121

[SPARK-24042][SQL] Collection function: zip_with_index

## What changes were proposed in this pull request?

Implement function zip_with_index(array[, indexFirst]) that transforms the 
input array by encapsulating elements into pairs with indexes indicating the 
order.

```
zip_with_index(array("d", "a", null, "b")) => 
[("d",0),("a",1),(null,2),("b",3)]
zip_with_index(array("d", "a", null, "b"), true) => 
[(0,"d"),(1,"a"),(2,null),(3,"b")]
```

## How was this patch tested?

New tests added into:
- CollectionExpressionSuite
- DataFrameFunctionsSuite

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AbsaOSS/spark 
feature/array-api-zip_with_index-to-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21121.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21121


commit 9f090309b8d13e37efaf7824b6d960a6f61ca79f
Author: mn-mikke 
Date:   2018-04-18T08:00:27Z

[SPARK-24042][SQL] Collection function: zip_with_index




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org