Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22017#discussion_r208602347
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
override def prettyName: String = "aggregate"
}
+
+/**
+ * Merges two given maps into a single map by applying function to the
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+ usage =
+ """
+ _FUNC_(map1, map2, function) - Merges two given maps into a single
map by applying
+ function to the pair of values with the same key. For keys only
presented in one map,
+ NULL will be passed as the value for the missing key. If an input
map contains duplicated
+ keys, only the first entry of the duplicated key is passed into the
lambda function.
+ """,
+ examples = """
+ Examples:
+ > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1,
v2) -> concat(v1, v2));
+ {1:"ax",2:"by"}
+ """,
+ since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function:
Expression)
+ extends HigherOrderFunction with CodegenFallback {
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
+
+ @transient lazy val (keyType, leftValueType, _) =
+ HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+ @transient lazy val (_, rightValueType, _) =
+ HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+ @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+ override def inputs: Seq[Expression] = left :: right :: Nil
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ override def nullable: Boolean = left.nullable || right.nullable
+
+ override def dataType: DataType = MapType(keyType, function.dataType,
function.nullable)
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ (left.dataType, right.dataType) match {
+ case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+ TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+ case _ => TypeCheckResult.TypeCheckFailure(s"The input to function
$prettyName should have " +
+ s"been two ${MapType.simpleString}s with the same key type, but
it's " +
+ s"[${left.dataType.catalogString},
${right.dataType.catalogString}].")
+ }
+ }
+
+ override def bind(f: (Expression, Seq[(DataType, Boolean)]) =>
LambdaFunction): MapZipWith = {
+ val arguments = Seq((keyType, false), (leftValueType, true),
(rightValueType, true))
+ copy(function = f(function, arguments))
+ }
+
+ override def eval(input: InternalRow): Any = {
+ val value1 = left.eval(input)
+ if (value1 == null) {
+ null
+ } else {
+ val value2 = right.eval(input)
+ if (value2 == null) {
+ null
+ } else {
+ nullSafeEval(input, value1, value2)
+ }
+ }
+ }
+
+ @transient lazy val LambdaFunction(_, Seq(
+ keyVar: NamedLambdaVariable,
+ value1Var: NamedLambdaVariable,
+ value2Var: NamedLambdaVariable),
+ _) = function
+
+ private def keyTypeSupportsEquals = keyType match {
+ case BinaryType => false
+ case _: AtomicType => true
+ case _ => false
+ }
+
+ @transient private lazy val getKeysWithValueIndexes:
+ (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+ if (keyTypeSupportsEquals) {
+ getKeysWithIndexesFast
+ } else {
+ getKeysWithIndexesBruteForce
+ }
+ }
+
+ private def assertSizeOfArrayBuffer(size: Int): Unit = {
+ if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+ throw new RuntimeException(s"Unsuccessful try to zip maps with $size
" +
+ s"unique keys due to exceeding the array size limit " +
+ s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+ }
+ }
+
+ private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) =
{
+ val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --
you can change a bit approach from the current one. For the first array of
keys, in particular, you don't need to check neither whether the key is there
nor the size of the output array, you just need to add them. Then you can add
the keys from the other one with the logic here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]