Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20858#discussion_r181359247
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
    @@ -287,3 +290,231 @@ case class ArrayContains(left: Expression, right: 
Expression)
     
       override def prettyName: String = "array_contains"
     }
    +
    +/**
    + * Concatenates multiple input columns together into a single column.
    + * The function works with strings, binary and compatible array columns.
    + */
    +@ExpressionDescription(
    +  usage = "_FUNC_(col1, col2, ..., colN) - Returns the concatenation of 
col1, col2, ..., colN.",
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_('Spark', 'SQL');
    +       SparkSQL
    +      > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
    + |     [1,2,3,4,5,6]
    +  """)
    +case class Concat(children: Seq[Expression]) extends Expression {
    +
    +  val allowedTypes = Seq(StringType, BinaryType, ArrayType)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    if (children.isEmpty) {
    +      TypeCheckResult.TypeCheckSuccess
    +    } else {
    +      val childTypes = children.map(_.dataType)
    +      if (childTypes.exists(tpe => 
!allowedTypes.exists(_.acceptsType(tpe)))) {
    +        return TypeCheckResult.TypeCheckFailure(
    +          s"input to function $prettyName should have been StringType, 
BinaryType or ArrayType," +
    +            s" but it's " + childTypes.map(_.simpleString).mkString("[", 
", ", "]"))
    +      }
    +      TypeUtils.checkForSameTypeInputExpr(childTypes, s"function 
$prettyName")
    +    }
    +  }
    +
    +  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
    +
    +  lazy val javaType: String = CodeGenerator.javaType(dataType)
    +
    +  override def nullable: Boolean = children.exists(_.nullable)
    +  override def foldable: Boolean = children.forall(_.foldable)
    +
    +  override def eval(input: InternalRow): Any = dataType match {
    +    case BinaryType =>
    +      val inputs = children.map(_.eval(input).asInstanceOf[Array[Byte]])
    +      ByteArray.concat(inputs: _*)
    +    case StringType =>
    +      val inputs = children.map(_.eval(input).asInstanceOf[UTF8String])
    +      UTF8String.concat(inputs : _*)
    +    case ArrayType(elementType, _) =>
    +      val inputs = children.toStream.map(_.eval(input))
    +      if (inputs.contains(null)) {
    +        null
    +      } else {
    +        val elements = 
inputs.flatMap(_.asInstanceOf[ArrayData].toObjectArray(elementType))
    --- End diff --
    
    Can we always allocate an array? I think that the total array element size 
may be overflow in some cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to