zhengruifeng commented on code in PR #38867:
URL: https://github.com/apache/spark/pull/38867#discussion_r1042161861


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array 
indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, 
itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with 
ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) 
=>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {

Review Comment:
   the `posExpr` should be `IntegerType`, but as to `srcArrayExpr` and 
`itemExpr`, I think you may refer to `ArrayRemove`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array 
indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, 
itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with 
ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) 
=>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    (first.dataType, second.dataType, third.dataType) match {
+      case (_: ArrayType, e2, e3) if e2 != IntegerType =>
+        DataTypeMismatch(
+          errorSubClass = "UNEXPECTED_INPUT_TYPE",
+          messageParameters = Map(
+            "paramIndex" -> "2",
+            "requiredType" -> toSQLType(IntegerType),
+            "inputSql" -> toSQLExpr(second),
+            "inputType" -> toSQLType(second.dataType))
+        )
+      case _ => TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val arr = first.eval(input)
+    val pos = second.eval(input)
+    val item = third.eval(input)
+
+    if (arr == null || pos == null) {
+      null
+    } else {
+      nullSafeEval(arr, pos, item)
+    }
+  }
+
+  override def nullSafeEval(arr: Any, pos: Any, item: Any): Any = {
+    val baseArr = arr.asInstanceOf[ArrayData]
+    val posInt = pos.asInstanceOf[Int]
+    val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+    val newArrayLength = baseArr.numElements() + 1
+
+    if (newArrayLength  > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+      throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(newArrayLength)
+    }
+    val validatedPosInt = this.getValidPosIndexOrThrow(posInt, 
baseArr.numElements())
+
+    if (validatedPosInt < 0 || validatedPosInt > baseArr.numElements()) {
+      null
+    } else {
+      val newArray = new Array[Any](newArrayLength)
+      baseArr.foreach(arrayElementType, (i, v) =>
+        if (i >= validatedPosInt) {
+          newArray(i + 1) = v
+        } else {
+          newArray(i) = v
+        }
+      )
+      newArray(validatedPosInt) = item
+
+      new GenericArrayData(newArray)
+    }
+  }
+
+  private def getValidPosIndexOrThrow(pos: Int, numArrayElements: Int): Int = {
+    val validIdx = if (pos == 0) {
+      throw QueryExecutionErrors.elementAtByIndexZeroError(getContextOrNull())

Review Comment:
   0 should be acceptable



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array 
indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, 
itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with 
ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) 
=>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    (first.dataType, second.dataType, third.dataType) match {
+      case (_: ArrayType, e2, e3) if e2 != IntegerType =>
+        DataTypeMismatch(
+          errorSubClass = "UNEXPECTED_INPUT_TYPE",
+          messageParameters = Map(
+            "paramIndex" -> "2",
+            "requiredType" -> toSQLType(IntegerType),
+            "inputSql" -> toSQLExpr(second),
+            "inputType" -> toSQLType(second.dataType))
+        )
+      case _ => TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val arr = first.eval(input)
+    val pos = second.eval(input)
+    val item = third.eval(input)
+
+    if (arr == null || pos == null) {
+      null
+    } else {
+      nullSafeEval(arr, pos, item)
+    }
+  }
+
+  override def nullSafeEval(arr: Any, pos: Any, item: Any): Any = {
+    val baseArr = arr.asInstanceOf[ArrayData]
+    val posInt = pos.asInstanceOf[Int]
+    val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+    val newArrayLength = baseArr.numElements() + 1
+
+    if (newArrayLength  > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+      throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(newArrayLength)
+    }
+    val validatedPosInt = this.getValidPosIndexOrThrow(posInt, 
baseArr.numElements())
+
+    if (validatedPosInt < 0 || validatedPosInt > baseArr.numElements()) {
+      null
+    } else {
+      val newArray = new Array[Any](newArrayLength)
+      baseArr.foreach(arrayElementType, (i, v) =>
+        if (i >= validatedPosInt) {
+          newArray(i + 1) = v
+        } else {
+          newArray(i) = v
+        }
+      )
+      newArray(validatedPosInt) = item
+
+      new GenericArrayData(newArray)
+    }
+  }
+
+  private def getValidPosIndexOrThrow(pos: Int, numArrayElements: Int): Int = {

Review Comment:
   since this method is only used once, I think we do not need it. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array 
indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, 
itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with 
ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) 
=>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    (first.dataType, second.dataType, third.dataType) match {
+      case (_: ArrayType, e2, e3) if e2 != IntegerType =>
+        DataTypeMismatch(
+          errorSubClass = "UNEXPECTED_INPUT_TYPE",
+          messageParameters = Map(
+            "paramIndex" -> "2",
+            "requiredType" -> toSQLType(IntegerType),
+            "inputSql" -> toSQLExpr(second),
+            "inputType" -> toSQLType(second.dataType))
+        )
+      case _ => TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val arr = first.eval(input)
+    val pos = second.eval(input)
+    val item = third.eval(input)
+
+    if (arr == null || pos == null) {
+      null
+    } else {
+      nullSafeEval(arr, pos, item)
+    }
+  }
+
+  override def nullSafeEval(arr: Any, pos: Any, item: Any): Any = {
+    val baseArr = arr.asInstanceOf[ArrayData]
+    val posInt = pos.asInstanceOf[Int]
+    val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+    val newArrayLength = baseArr.numElements() + 1

Review Comment:
   this is not the case, refer to the second example in 
https://docs.snowflake.com/en/sql-reference/functions/array_insert.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to