Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21986#discussion_r207921448
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
---
@@ -205,29 +230,85 @@ case class ArrayTransform(
(elementVar, indexVar)
}
- override def eval(input: InternalRow): Any = {
- val arr = this.input.eval(input).asInstanceOf[ArrayData]
- if (arr == null) {
- null
- } else {
- val f = functionForEval
- val result = new GenericArrayData(new Array[Any](arr.numElements))
- var i = 0
- while (i < arr.numElements) {
- elementVar.value.set(arr.get(i, elementVar.dataType))
- if (indexVar.isDefined) {
- indexVar.get.value.set(i)
- }
- result.update(i, f.eval(input))
- i += 1
+ override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any =
{
+ val arr = inputValue.asInstanceOf[ArrayData]
+ val f = functionForEval
+ val result = new GenericArrayData(new Array[Any](arr.numElements))
+ var i = 0
+ while (i < arr.numElements) {
+ elementVar.value.set(arr.get(i, elementVar.dataType))
+ if (indexVar.isDefined) {
+ indexVar.get.value.set(i)
}
- result
+ result.update(i, f.eval(inputRow))
+ i += 1
}
+ result
}
override def prettyName: String = "transform"
}
+/**
+ * Filters entries in a map using the provided function.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Filters entries in a map using the
function.",
+examples = """
+ Examples:
+ > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v);
+ [1 -> 0, 3 -> -1]
+ """,
+since = "2.4.0")
+case class MapFilter(
+ input: Expression,
+ function: Expression)
+ extends MapBasedUnaryHigherOrderFunction with CodegenFallback {
+
+ @transient val (keyType, valueType, valueContainsNull) = input.dataType
match {
+ case MapType(kType, vType, vContainsNull) => (kType, vType,
vContainsNull)
+ case _ =>
+ val MapType(kType, vType, vContainsNull) =
MapType.defaultConcreteType
+ (kType, vType, vContainsNull)
+ }
+
+ @transient lazy val (keyVar, valueVar) = {
+ val args = function.asInstanceOf[LambdaFunction].arguments
+ (args.head.asInstanceOf[NamedLambdaVariable],
args.tail.head.asInstanceOf[NamedLambdaVariable])
+ }
+
+ override def bind(f: (Expression, Seq[(DataType, Boolean)]) =>
LambdaFunction): MapFilter = {
+ function match {
+ case LambdaFunction(_, _, _) =>
--- End diff --
right, I am removing it, thanks
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]