Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208204796
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the 
pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
    +      function to the pair of values with the same key. For keys only 
presented in one map,
    +      NULL will be passed as the value for the missing key. If an input 
map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the 
lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
    +        s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  private def getMapType(expr: Expression) = expr.dataType match {
    +    case m: MapType => m
    +    case _ => MapType.defaultConcreteType
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: 
Any): Any = {
    +    val mapData1 = value1.asInstanceOf[MapData]
    +    val mapData2 = value2.asInstanceOf[MapData]
    +    val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
    +    val values = new GenericArrayData(new Array[Any](keys.numElements()))
    +    keys.foreach(keyType, (idx: Int, key: Any) => {
    +      val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, 
leftValueType, ordering)
    --- End diff --
    
    Thanks for mentioning this! I'm not happy with the current complexity 
either. I've assumed that the implementation of maps will change into something 
with O(1) element access in future. By then, the complexity would be O(N) for 
types supporting equals as well and we would safe a portion of duplicated code.
    
    If you think that maps will remain like this for a long time, really like 
your suggestion with indexes.
    
    @ueshin What's your view on that?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to