[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r177084478
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
+  {
+val mismatches = children.zipWithIndex.collect {
+  case (child, idx) if !ArrayType.acceptsType(child.dataType) =>
+s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " +
+  s"however, '${child.sql}' is of ${child.dataType.simpleString} 
type."
+}
+
+if (mismatches.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(mismatches.mkString(" "))
+}
+  }
+
+  override def dataType: ArrayType =
+children
+  .headOption.map(_.dataType.asInstanceOf[ArrayType])
+  .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType])
+
+
+  override protected def nullSafeEval(inputs: Seq[Any]): Any = {
+val elements = 
inputs.flatMap(_.asInstanceOf[ArrayData].toObjectArray(dataType.elementType))
+new GenericArrayData(elements)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, arrays => {
+  val elementType = dataType.elementType
+  if (CodeGenerator.isPrimitiveType(elementType)) {
+genCodeForConcatOfPrimitiveElements(ctx, elementType, arrays, 
ev.value)
+  } else {
+genCodeForConcatOfComplexElements(ctx, arrays, ev.value)
+  }
+})
+  }
+
+  private def genCodeForNumberOfElements(
+ctx: CodegenContext,
+elements: Seq[String]
+  ) : (String, String) = {
+val variableName = ctx.freshName("numElements")
+val code = elements
+  .map(el => s"$variableName += $el.numElements();")
+  .foldLeft( s"int $variableName = 0;")((acc, s) => acc + "\n" + s)
+(code, variableName)
+  }
+
+  private def genCodeForConcatOfPrimitiveElements(
+ctx: CodegenContext,
+elementType: DataType,
+elements: Seq[String],
+arrayDataName: String
+  ): String = {
+val arrayName = ctx.freshName("array")
+val arraySizeName = ctx.freshName("size")
+val counter = ctx.freshName("counter")
+val tempArrayDataName = ctx.freshName("tempArrayData")
+
+val (numElemCode, numElemName) = genCodeForNumberOfElements(ctx, 
elements)
+
+val unsafeArraySizeInBytes = s"""
+  |int $arraySizeName = 
UnsafeArrayData.calculateHeaderPortionInBytes($numElemName) +
+  
|${classOf[ByteArrayMethods].getName}.roundNumberOfBytesToNearestWord(
+  |${elementType.defaultSize} * $numElemName
+  |);
+  """.stripMargin
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(elementType)
+val assignments = elements.map { el =>
+  s"""
+|for(int z = 0; z < $el.numElements(); z++) {
+| if($el.isNullAt(z)) {
+|   $tempArrayDataName.setNullAt($counter);
+| } else {
+|   $tempArrayDataName.set$primitiveValueTypeName(
+| $counter,
+| $el.get$primitiveValueTypeName(z)
+|   );
+| }
+| $counter++;
+|}
+""".stripMargin
+}.mkString("\n")
+
+s"""
+  |$numElemCode
+  |$unsafeArraySizeInBytes
+  |byte[] $arrayName = new byte[$arraySizeName];
+  |UnsafeArrayData $tempArrayDataName = new UnsafeArrayData();
+  |Platform.putLong($arrayName, $baseOffset, $numElemName);
+  |$tempArrayDataName.pointTo($arrayName, $baseOffset, $arraySizeName);
+  |int $counter = 0;
+  |$assignments
+  |$arrayDataName = $tempArrayDataName;
+""".stripMargin

[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r177059083
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
+  {
+val mismatches = children.zipWithIndex.collect {
+  case (child, idx) if !ArrayType.acceptsType(child.dataType) =>
+s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " +
+  s"however, '${child.sql}' is of ${child.dataType.simpleString} 
type."
+}
+
+if (mismatches.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(mismatches.mkString(" "))
+}
+  }
+
+  override def dataType: ArrayType =
+children
+  .headOption.map(_.dataType.asInstanceOf[ArrayType])
+  .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType])
--- End diff --

Definitely share your opinion, but I think we should be consistent across 
the whole Spark SQL API. Functions like `concat` and `concat_ws` accept empty 
children as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r177053835
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
--- End diff --

Think I fixed all style differences.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r177008481
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -1834,6 +1834,25 @@ def array_contains(col, value):
 return Column(sc._jvm.functions.array_contains(_to_java_column(col), 
value))
 
 
+@since(2.4)
+def concat_arrays(*cols):
+"""
+Collection function: Concatenates multiple arrays into one.
+
+:param cols: list of column names (string) or list of :class:`Column` 
expressions that have
+the same data type.
--- End diff --

Shall we note `cols` are expected to be array type?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r177004054
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
--- End diff --

Can we just put this in `checkInputDataTypes`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r177007456
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
+  {
+val mismatches = children.zipWithIndex.collect {
+  case (child, idx) if !ArrayType.acceptsType(child.dataType) =>
+s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " +
+  s"however, '${child.sql}' is of ${child.dataType.simpleString} 
type."
+}
+
+if (mismatches.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(mismatches.mkString(" "))
+}
+  }
+
+  override def dataType: ArrayType =
+children
+  .headOption.map(_.dataType.asInstanceOf[ArrayType])
+  .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType])
--- End diff --

Should we allow empty children? I can't think of a use case for now and we 
should better disallow it first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r177003161
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
--- End diff --

Shall we add `since` too?

```
...
   [1,2,3,4,5,6]
  """,
  since = "2.4.0")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-25 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176981046
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
+  {
+val mismatches = children.zipWithIndex.collect {
+  case (child, idx) if !ArrayType.acceptsType(child.dataType) =>
+s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " +
+  s"however, '${child.sql}' is of ${child.dataType.simpleString} 
type."
+}
+
+if (mismatches.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(mismatches.mkString(" "))
+}
+  }
+
+  override def dataType: ArrayType =
+children
+  .headOption.map(_.dataType.asInstanceOf[ArrayType])
+  .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType])
+
+
+  override protected def nullSafeEval(inputs: Seq[Any]): Any = {
+val elements = 
inputs.flatMap(_.asInstanceOf[ArrayData].toObjectArray(dataType.elementType))
+new GenericArrayData(elements)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, arrays => {
+  val elementType = dataType.elementType
+  if (CodeGenerator.isPrimitiveType(elementType)) {
+genCodeForConcatOfPrimitiveElements(ctx, elementType, arrays, 
ev.value)
+  } else {
+genCodeForConcatOfComplexElements(ctx, arrays, ev.value)
+  }
+})
+  }
+
+  private def genCodeForNumberOfElements(
+ctx: CodegenContext,
+elements: Seq[String]
+  ) : (String, String) = {
+val variableName = ctx.freshName("numElements")
+val code = elements
+  .map(el => s"$variableName += $el.numElements();")
+  .foldLeft( s"int $variableName = 0;")((acc, s) => acc + "\n" + s)
+(code, variableName)
+  }
+
+  private def genCodeForConcatOfPrimitiveElements(
+ctx: CodegenContext,
+elementType: DataType,
+elements: Seq[String],
+arrayDataName: String
+  ): String = {
+val arrayName = ctx.freshName("array")
+val arraySizeName = ctx.freshName("size")
+val counter = ctx.freshName("counter")
+val tempArrayDataName = ctx.freshName("tempArrayData")
+
+val (numElemCode, numElemName) = genCodeForNumberOfElements(ctx, 
elements)
+
+val unsafeArraySizeInBytes = s"""
+  |int $arraySizeName = 
UnsafeArrayData.calculateHeaderPortionInBytes($numElemName) +
+  
|${classOf[ByteArrayMethods].getName}.roundNumberOfBytesToNearestWord(
+  |${elementType.defaultSize} * $numElemName
+  |);
+  """.stripMargin
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(elementType)
+val assignments = elements.map { el =>
+  s"""
+|for(int z = 0; z < $el.numElements(); z++) {
+| if($el.isNullAt(z)) {
+|   $tempArrayDataName.setNullAt($counter);
+| } else {
+|   $tempArrayDataName.set$primitiveValueTypeName(
+| $counter,
+| $el.get$primitiveValueTypeName(z)
+|   );
+| }
+| $counter++;
+|}
+""".stripMargin
+}.mkString("\n")
+
+s"""
+  |$numElemCode
+  |$unsafeArraySizeInBytes
+  |byte[] $arrayName = new byte[$arraySizeName];
+  |UnsafeArrayData $tempArrayDataName = new UnsafeArrayData();
+  |Platform.putLong($arrayName, $baseOffset, $numElemName);
+  |$tempArrayDataName.pointTo($arrayName, $baseOffset, $arraySizeName);
+  |int $counter = 0;
+  |$assignments
+  |$arrayDataName = $tempArrayDataName;
+""".stripMargin
+

[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-25 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176971799
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
--- End diff --

Can we add a common base class (e.g., `ConcatLike`) for handling nested 
`ConcatArrays` in the optimizer(`CombineConcat`)?


https://github.com/apache/spark/blob/e4bec7cb88b9ee63f8497e3f9e0ab0bfa5d5a77c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala#L649


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-25 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176966017
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3046,6 +3046,14 @@ object functions {
 ArrayContains(column.expr, Literal(value))
   }
 
+  /**
+   * Merges multiple arrays into one by putting elements from the specific 
array after elements
+   * from the previous array. If any of the arrays is null, null is 
returned.
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def concat_arrays(columns: Column*): Column = withExpr { 
ConcatArrays(columns.map(_.expr)) }
--- End diff --

We need to add this func. in `sql/functions` here? It seems we might 
recommend users to use these kinds of functions via `selectExpr`, so is it okay 
to add this only in `FunctionRegistry`? Thoughts? @viirya @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901542
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
--- End diff --

Style issue:
```scala
if (...) {
  ...
} else {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901684
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
--- End diff --

Do we need to bring in `NullSafeEvaluation`? If only `ConcatArray` uses it, 
we may not need to add this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901843
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
+val values = children.map(_.eval(input))
+if (values.contains(null)) null
+else nullSafeEval(values)
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If a class utilizing 
NullSaveEvaluation keep
+   * the default nullability, they can override this method to save 
null-check code.  If we need
+   * full control of evaluation process, we should override [[eval]].
+   */
+  protected def nullSafeEval(inputs: Seq[Any]): Any =
+sys.error(s"The class utilizing NullSaveEvaluation must override 
either eval or nullSafeEval")
+
+  /**
+   * Short hand for generating of null save evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f accepts a sequence of variable names and returns Java code 
to compute the output.
+   */
+  protected def defineCodeGen(
+ctx: CodegenContext,
+ev: ExprCode,
+f: Seq[String] => String): ExprCode = {
+nullSafeCodeGen(ctx, ev, values => {
+  s"${ev.value} = ${f(values)};"
+})
+  }
+
+  /**
+   * Called by expressions to generate null safe evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f a function that accepts a sequence of non-null evaluation 
result names of children
+   *  and returns Java code to compute the output.
+   */
+  protected def nullSafeCodeGen(
+   ctx: CodegenContext,
+   ev: ExprCode,
+   f: Seq[String] => String): ExprCode = {
+val gens = children.map(_.genCode(ctx))
+val resultCode = f(gens.map(_.value))
+
+if (nullable) {
+  val nullSafeEval =
+(s"""
+  ${ev.isNull} = false; // resultCode could change nullability.
+  $resultCode
+""" /: children.zip(gens)) {
--- End diff --

Use `foldLeft` for readability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176902162
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
+  {
+val mismatches = children.zipWithIndex.collect {
+  case (child, idx) if !ArrayType.acceptsType(child.dataType) =>
+s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " +
+  s"however, '${child.sql}' is of ${child.dataType.simpleString} 
type."
+}
+
+if (mismatches.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(mismatches.mkString(" "))
+}
+  }
+
+  override def dataType: ArrayType =
+children
+  .headOption.map(_.dataType.asInstanceOf[ArrayType])
+  .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType])
+
+
+  override protected def nullSafeEval(inputs: Seq[Any]): Any = {
+val elements = 
inputs.flatMap(_.asInstanceOf[ArrayData].toObjectArray(dataType.elementType))
+new GenericArrayData(elements)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, arrays => {
+  val elementType = dataType.elementType
+  if (CodeGenerator.isPrimitiveType(elementType)) {
+genCodeForConcatOfPrimitiveElements(ctx, elementType, arrays, 
ev.value)
+  } else {
+genCodeForConcatOfComplexElements(ctx, arrays, ev.value)
+  }
+})
+  }
+
+  private def genCodeForNumberOfElements(
+ctx: CodegenContext,
+elements: Seq[String]
+  ) : (String, String) = {
+val variableName = ctx.freshName("numElements")
+val code = elements
+  .map(el => s"$variableName += $el.numElements();")
+  .foldLeft( s"int $variableName = 0;")((acc, s) => acc + "\n" + s)
+(code, variableName)
+  }
+
+  private def genCodeForConcatOfPrimitiveElements(
+ctx: CodegenContext,
+elementType: DataType,
+elements: Seq[String],
+arrayDataName: String
+  ): String = {
+val arrayName = ctx.freshName("array")
+val arraySizeName = ctx.freshName("size")
+val counter = ctx.freshName("counter")
+val tempArrayDataName = ctx.freshName("tempArrayData")
+
+val (numElemCode, numElemName) = genCodeForNumberOfElements(ctx, 
elements)
+
+val unsafeArraySizeInBytes = s"""
+  |int $arraySizeName = 
UnsafeArrayData.calculateHeaderPortionInBytes($numElemName) +
+  
|${classOf[ByteArrayMethods].getName}.roundNumberOfBytesToNearestWord(
+  |${elementType.defaultSize} * $numElemName
+  |);
+  """.stripMargin
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(elementType)
+val assignments = elements.map { el =>
+  s"""
+|for(int z = 0; z < $el.numElements(); z++) {
+| if($el.isNullAt(z)) {
--- End diff --

Style: `if ()`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901957
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
+val values = children.map(_.eval(input))
+if (values.contains(null)) null
+else nullSafeEval(values)
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If a class utilizing 
NullSaveEvaluation keep
+   * the default nullability, they can override this method to save 
null-check code.  If we need
+   * full control of evaluation process, we should override [[eval]].
+   */
+  protected def nullSafeEval(inputs: Seq[Any]): Any =
+sys.error(s"The class utilizing NullSaveEvaluation must override 
either eval or nullSafeEval")
+
+  /**
+   * Short hand for generating of null save evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f accepts a sequence of variable names and returns Java code 
to compute the output.
+   */
+  protected def defineCodeGen(
+ctx: CodegenContext,
+ev: ExprCode,
+f: Seq[String] => String): ExprCode = {
+nullSafeCodeGen(ctx, ev, values => {
+  s"${ev.value} = ${f(values)};"
+})
+  }
+
+  /**
+   * Called by expressions to generate null safe evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f a function that accepts a sequence of non-null evaluation 
result names of children
+   *  and returns Java code to compute the output.
+   */
+  protected def nullSafeCodeGen(
+   ctx: CodegenContext,
+   ev: ExprCode,
+   f: Seq[String] => String): ExprCode = {
+val gens = children.map(_.genCode(ctx))
+val resultCode = f(gens.map(_.value))
+
+if (nullable) {
+  val nullSafeEval =
+(s"""
+  ${ev.isNull} = false; // resultCode could change nullability.
+  $resultCode
+""" /: children.zip(gens)) {
+  case (acc, (child, gen)) =>
+gen.code + ctx.nullSafeExec(child.nullable, gen.isNull)(acc)
--- End diff --

For example, for a binary expression, doesn't this generate code like:
```scala
rightGen.code + ctx.nullSafeExec(right.nullable, rightGen.isNull) {
  leftGen.code + ctx.nullSafeExec(left.nullable, leftGen.isNull) {
${ev.isNull} = false; // resultCode could change nullability.
$resultCode
  }
}
```

Although for deterministic expressions, the evaluation order doesn't 
matter. But for non-deterministic, I'm little concerned that it may cause 
unexpected change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901989
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
--- End diff --

Defines that the element types of the arrays must be the same.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901348
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
--- End diff --

Spark usually use the style like:

```scala
override def eval(input: InternalRow): Any = {
  val values = children.map(_.eval(input))
  if (values.contains(null)) {
null
  } else {
nullSafeEval(values)
  }
}
```

You could follow the style of other codes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901429
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
--- End diff --

There are other places where the braces `{}` style doesn't follow Spark 
codes. We should keep the same code style.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901467
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
+val values = children.map(_.eval(input))
--- End diff --

We probably don't need to evaluate all children. Once any child expression 
is null, we can just return null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176901317
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
--- End diff --

typo: null safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176902327
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
+  {
+val mismatches = children.zipWithIndex.collect {
+  case (child, idx) if !ArrayType.acceptsType(child.dataType) =>
+s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " +
+  s"however, '${child.sql}' is of ${child.dataType.simpleString} 
type."
+}
+
+if (mismatches.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(mismatches.mkString(" "))
+}
+  }
+
+  override def dataType: ArrayType =
+children
+  .headOption.map(_.dataType.asInstanceOf[ArrayType])
+  .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType])
+
+
+  override protected def nullSafeEval(inputs: Seq[Any]): Any = {
+val elements = 
inputs.flatMap(_.asInstanceOf[ArrayData].toObjectArray(dataType.elementType))
+new GenericArrayData(elements)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, arrays => {
+  val elementType = dataType.elementType
+  if (CodeGenerator.isPrimitiveType(elementType)) {
+genCodeForConcatOfPrimitiveElements(ctx, elementType, arrays, 
ev.value)
+  } else {
+genCodeForConcatOfComplexElements(ctx, arrays, ev.value)
+  }
+})
+  }
+
+  private def genCodeForNumberOfElements(
+ctx: CodegenContext,
+elements: Seq[String]
+  ) : (String, String) = {
+val variableName = ctx.freshName("numElements")
+val code = elements
+  .map(el => s"$variableName += $el.numElements();")
+  .foldLeft( s"int $variableName = 0;")((acc, s) => acc + "\n" + s)
+(code, variableName)
+  }
+
+  private def genCodeForConcatOfPrimitiveElements(
+ctx: CodegenContext,
+elementType: DataType,
+elements: Seq[String],
+arrayDataName: String
+  ): String = {
+val arrayName = ctx.freshName("array")
+val arraySizeName = ctx.freshName("size")
+val counter = ctx.freshName("counter")
+val tempArrayDataName = ctx.freshName("tempArrayData")
+
+val (numElemCode, numElemName) = genCodeForNumberOfElements(ctx, 
elements)
+
+val unsafeArraySizeInBytes = s"""
+  |int $arraySizeName = 
UnsafeArrayData.calculateHeaderPortionInBytes($numElemName) +
+  
|${classOf[ByteArrayMethods].getName}.roundNumberOfBytesToNearestWord(
+  |${elementType.defaultSize} * $numElemName
+  |);
+  """.stripMargin
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(elementType)
+val assignments = elements.map { el =>
+  s"""
+|for(int z = 0; z < $el.numElements(); z++) {
+| if($el.isNullAt(z)) {
+|   $tempArrayDataName.setNullAt($counter);
+| } else {
+|   $tempArrayDataName.set$primitiveValueTypeName(
+| $counter,
+| $el.get$primitiveValueTypeName(z)
+|   );
+| }
+| $counter++;
+|}
+""".stripMargin
+}.mkString("\n")
+
+s"""
+  |$numElemCode
+  |$unsafeArraySizeInBytes
+  |byte[] $arrayName = new byte[$arraySizeName];
+  |UnsafeArrayData $tempArrayDataName = new UnsafeArrayData();
+  |Platform.putLong($arrayName, $baseOffset, $numElemName);
+  |$tempArrayDataName.pointTo($arrayName, $baseOffset, $arraySizeName);
+  |int $counter = 0;
+  |$assignments
+  |$arrayDataName = $tempArrayDataName;
+""".stripMargin
+

[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176902161
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Concatenates multiple arrays into one.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+   [1,2,3,4,5,6]
+  """)
+case class ConcatArrays(children: Seq[Expression]) extends Expression with 
NullSafeEvaluation {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val arrayCheck = checkInputDataTypesAreArrays
+if(arrayCheck.isFailure) arrayCheck
+else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
s"function $prettyName")
+  }
+
+  private def checkInputDataTypesAreArrays(): TypeCheckResult =
+  {
+val mismatches = children.zipWithIndex.collect {
+  case (child, idx) if !ArrayType.acceptsType(child.dataType) =>
+s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " +
+  s"however, '${child.sql}' is of ${child.dataType.simpleString} 
type."
+}
+
+if (mismatches.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(mismatches.mkString(" "))
+}
+  }
+
+  override def dataType: ArrayType =
+children
+  .headOption.map(_.dataType.asInstanceOf[ArrayType])
+  .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType])
+
+
+  override protected def nullSafeEval(inputs: Seq[Any]): Any = {
+val elements = 
inputs.flatMap(_.asInstanceOf[ArrayData].toObjectArray(dataType.elementType))
+new GenericArrayData(elements)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, arrays => {
+  val elementType = dataType.elementType
+  if (CodeGenerator.isPrimitiveType(elementType)) {
+genCodeForConcatOfPrimitiveElements(ctx, elementType, arrays, 
ev.value)
+  } else {
+genCodeForConcatOfComplexElements(ctx, arrays, ev.value)
+  }
+})
+  }
+
+  private def genCodeForNumberOfElements(
+ctx: CodegenContext,
+elements: Seq[String]
+  ) : (String, String) = {
+val variableName = ctx.freshName("numElements")
+val code = elements
+  .map(el => s"$variableName += $el.numElements();")
+  .foldLeft( s"int $variableName = 0;")((acc, s) => acc + "\n" + s)
+(code, variableName)
+  }
+
+  private def genCodeForConcatOfPrimitiveElements(
+ctx: CodegenContext,
+elementType: DataType,
+elements: Seq[String],
+arrayDataName: String
+  ): String = {
+val arrayName = ctx.freshName("array")
+val arraySizeName = ctx.freshName("size")
+val counter = ctx.freshName("counter")
+val tempArrayDataName = ctx.freshName("tempArrayData")
+
+val (numElemCode, numElemName) = genCodeForNumberOfElements(ctx, 
elements)
+
+val unsafeArraySizeInBytes = s"""
+  |int $arraySizeName = 
UnsafeArrayData.calculateHeaderPortionInBytes($numElemName) +
+  
|${classOf[ByteArrayMethods].getName}.roundNumberOfBytesToNearestWord(
+  |${elementType.defaultSize} * $numElemName
+  |);
+  """.stripMargin
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(elementType)
+val assignments = elements.map { el =>
+  s"""
+|for(int z = 0; z < $el.numElements(); z++) {
--- End diff --

Stype: `for (`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-23 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176847009
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
+val values = children.map(_.eval(input))
+if (values.contains(null)) null
+else nullSafeEval(values)
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If a class utilizing 
NullSaveEvaluation keep
+   * the default nullability, they can override this method to save 
null-check code.  If we need
+   * full control of evaluation process, we should override [[eval]].
+   */
+  protected def nullSafeEval(inputs: Seq[Any]): Any =
+sys.error(s"The class utilizing NullSaveEvaluation must override 
either eval or nullSafeEval")
+
+  /**
+   * Short hand for generating of null save evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f accepts a sequence of variable names and returns Java code 
to compute the output.
+   */
+  protected def defineCodeGen(
+ctx: CodegenContext,
+ev: ExprCode,
+f: Seq[String] => String): ExprCode = {
+nullSafeCodeGen(ctx, ev, values => {
+  s"${ev.value} = ${f(values)};"
+})
+  }
+
+  /**
+   * Called by expressions to generate null safe evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f a function that accepts a sequence of non-null evaluation 
result names of children
+   *  and returns Java code to compute the output.
+   */
+  protected def nullSafeCodeGen(
--- End diff --

@WeichenXu123 I do agree that there are strong similarities in the code.

If you take a look at `UniryExpression`, `BinaryExpression`, 
`TernaryExpression`, you will see that methods responsible for null save 
evaluation and code generation are the same except the number of parameters. My 
intention has been to generalize the methods into the `NullSaveEvaluation` 
trait and remove the original methods in a different PR once the trait is in. I 
didn't want to create a big bang PR because of one additional function in API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-23 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176841337
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -408,6 +408,7 @@ object FunctionRegistry {
 expression[MapValues]("map_values"),
 expression[Size]("size"),
 expression[SortArray]("sort_array"),
+expression[ConcatArrays]("concat_arrays"),
--- End diff --

Ok, will merge the functions into one. Do you find having one expression 
class concatenation per the concatenation type ok?

I'm afraid if I incorporate all the logic into one expression class then 
the code will become messy since each codeGen and eveluation has a different 
nature.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176631936
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
+val values = children.map(_.eval(input))
+if (values.contains(null)) null
+else nullSafeEval(values)
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If a class utilizing 
NullSaveEvaluation keep
+   * the default nullability, they can override this method to save 
null-check code.  If we need
+   * full control of evaluation process, we should override [[eval]].
+   */
+  protected def nullSafeEval(inputs: Seq[Any]): Any =
+sys.error(s"The class utilizing NullSaveEvaluation must override 
either eval or nullSafeEval")
+
+  /**
+   * Short hand for generating of null save evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f accepts a sequence of variable names and returns Java code 
to compute the output.
+   */
+  protected def defineCodeGen(
+ctx: CodegenContext,
+ev: ExprCode,
+f: Seq[String] => String): ExprCode = {
+nullSafeCodeGen(ctx, ev, values => {
+  s"${ev.value} = ${f(values)};"
+})
+  }
+
+  /**
+   * Called by expressions to generate null safe evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f a function that accepts a sequence of non-null evaluation 
result names of children
+   *  and returns Java code to compute the output.
+   */
+  protected def nullSafeCodeGen(
--- End diff --

We will combine it with `concat`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176631836
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -408,6 +408,7 @@ object FunctionRegistry {
 expression[MapValues]("map_values"),
 expression[Size]("size"),
 expression[SortArray]("sort_array"),
+expression[ConcatArrays]("concat_arrays"),
--- End diff --

How about move it to collection functions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-22 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r176631255
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression {
  * and Hive function wrappers.
  */
 trait UserDefinedExpression
+
+/**
+ * The trait covers logic for performing null save evaluation and code 
generation.
+ */
+trait NullSafeEvaluation extends Expression
+{
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability 
of NullSafeEvaluation.
+   * If a class utilizing NullSaveEvaluation override [[nullable]], 
probably should also
+   * override this.
+   */
+  override def eval(input: InternalRow): Any =
+  {
+val values = children.map(_.eval(input))
+if (values.contains(null)) null
+else nullSafeEval(values)
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If a class utilizing 
NullSaveEvaluation keep
+   * the default nullability, they can override this method to save 
null-check code.  If we need
+   * full control of evaluation process, we should override [[eval]].
+   */
+  protected def nullSafeEval(inputs: Seq[Any]): Any =
+sys.error(s"The class utilizing NullSaveEvaluation must override 
either eval or nullSafeEval")
+
+  /**
+   * Short hand for generating of null save evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f accepts a sequence of variable names and returns Java code 
to compute the output.
+   */
+  protected def defineCodeGen(
+ctx: CodegenContext,
+ev: ExprCode,
+f: Seq[String] => String): ExprCode = {
+nullSafeCodeGen(ctx, ev, values => {
+  s"${ev.value} = ${f(values)};"
+})
+  }
+
+  /**
+   * Called by expressions to generate null safe evaluation code.
+   * If either of the sub-expressions is null, the result of this 
computation
+   * is assumed to be null.
+   *
+   * @param f a function that accepts a sequence of non-null evaluation 
result names of children
+   *  and returns Java code to compute the output.
+   */
+  protected def nullSafeCodeGen(
--- End diff --

This method looks almost the same with the one in `BinaryExpression`.  Can 
you avoid the code duplication ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r175527998
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -408,6 +408,7 @@ object FunctionRegistry {
 expression[MapValues]("map_values"),
 expression[Size]("size"),
 expression[SortArray]("sort_array"),
+expression[ConcatArrays]("concat_arrays"),
--- End diff --

I've already played with this option in my mind, but I'm not sure how 
concat would be categorized.

Currently, concat is defined as a pure string operation:
  /**
   * @group string_funcs
   * @since 1.5.0
   */
  @scala.annotation.varargs
  def concat(exprs: Column*): Column

Whereas the functionality in this PR belongs rather to the collection_funcs 
group.

Having just one function for both expressions would be elegant, but can you 
advise what group should be assigned to concat?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r175514291
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -408,6 +408,7 @@ object FunctionRegistry {
 expression[MapValues]("map_values"),
 expression[Size]("size"),
 expression[SortArray]("sort_array"),
+expression[ConcatArrays]("concat_arrays"),
--- End diff --

Why not reusing `concat`?

concat(array1, array2, ..., arrayN) -> array ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Implementation of the concat_a...

2018-03-19 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/20858

[SPARK-23736][SQL] Implementation of the concat_arrays function 
concatenating multiple array columns into one.

## What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and 
covers:
- Generalization of null save evaluation for multiple children expressions
- An expression for array concatenation
- A Python wrapper

## How was this patch tested?
New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AbsaOSS/spark 
feature/array-api-concat_arrays-to-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20858.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20858


commit 282e7249c2ed35add40f54087423ca62732b6046
Author: mn-mikke 
Date:   2018-03-13T20:35:45Z

[SPARK-23736][SQL] Implementation of the concat_arrays function 
concatenating multiple array columns into one.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org