Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22732#discussion_r225583730
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -2137,36 +2137,27 @@ class Analyzer(
     
           case p => p transformExpressionsUp {
     
    -        case udf@ScalaUDF(func, _, inputs, _, _, _, _, nullableTypes) =>
    -          if (nullableTypes.isEmpty) {
    -            // If no nullability info is available, do nothing. No fields 
will be specially
    -            // checked for null in the plan. If nullability info is 
incorrect, the results
    -            // of the UDF could be wrong.
    -            udf
    -          } else {
    -            // Otherwise, add special handling of null for fields that 
can't accept null.
    -            // The result of operations like this, when passed null, is 
generally to return null.
    -            assert(nullableTypes.length == inputs.length)
    -
    -            // TODO: skip null handling for not-nullable primitive inputs 
after we can completely
    -            // trust the `nullable` information.
    -            val needsNullCheck = (nullable: Boolean, expr: Expression) =>
    -              nullable && !expr.isInstanceOf[KnownNotNull]
    -            val inputsNullCheck = nullableTypes.zip(inputs)
    -              .filter { case (nullableType, expr) => 
needsNullCheck(!nullableType, expr) }
    -              .map { case (_, expr) => IsNull(expr) }
    -              .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2))
    -            // Once we add an `If` check above the udf, it is safe to mark 
those checked inputs
    -            // as not nullable (i.e., wrap them with `KnownNotNull`), 
because the null-returning
    -            // branch of `If` will be called if any of these checked 
inputs is null. Thus we can
    -            // prevent this rule from being applied repeatedly.
    -            val newInputs = nullableTypes.zip(inputs).map { case 
(nullable, expr) =>
    -              if (nullable) expr else KnownNotNull(expr)
    -            }
    -            inputsNullCheck
    -              .map(If(_, Literal.create(null, udf.dataType), 
udf.copy(children = newInputs)))
    -              .getOrElse(udf)
    -          }
    +        case udf @ ScalaUDF(_, _, inputs, handleNullForInputs, _, _, _, _)
    +          if !handleNullForInputs.forall(!_) =>
    +          // Otherwise, add special handling of null for fields that can't 
accept null.
    +          // The result of operations like this, when passed null, is 
generally to return null.
    +          assert(handleNullForInputs.length == inputs.length)
    +
    +          // TODO: skip null handling for not-nullable primitive inputs 
after we can completely
    +          // trust the `nullable` information.
    +          val inputsNullCheck = handleNullForInputs.zip(inputs)
    --- End diff --
    
    This should answer/confirm a couple of other questions above:
    Since we already have this flag `handleNullForInputs` in `ScalaUDF`,  we 
can take advantage of it in this rule as well. Say, the first time a `ScalaUDF` 
hits this rule with `handleNullForInputs` as "false, true, false", then we add 
a null-handling (if clause) for the second input which is flagged with "true", 
and from this point on we are all good with all inputs, and we can flag the new 
`ScalaUDF`'s `handleNullForInputs` as all "false". So even if the new 
`ScalaUDF` hits this rule for a second time, nothing will be done.
    It should work the same way for the "TODO" above, if `handleNullForInputs` 
has a "true" flag and the corresponding expression is NOT nullable, we can skip 
the null handling while flagging it as "false" in the new `ScalaUDF` in the end.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to