Github user sameeragarwal commented on a diff in the pull request:
https://github.com/apache/spark/pull/11372#discussion_r54460830
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -586,6 +588,62 @@ object NullPropagation extends Rule[LogicalPlan] {
}
/**
+ * Attempts to eliminate reading (unnecessary) NULL values if they are not
required for correctness
+ * by inserting isNotNull filters is the query plan. These filters are
currently inserted beneath
+ * existing Filters and Join operators and are inferred based on their
data constraints.
+ *
+ * Note: While this optimization is applicable to all types of join, it
primarily benefits Inner and
+ * LeftSemi joins.
+ */
+object NullFiltering extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case filter @ Filter(condition, child: LogicalPlan) =>
+ val isNotNullConstraints =
filter.constraints.filter(_.isInstanceOf[IsNotNull])
+ val newCondition = if (isNotNullConstraints.nonEmpty) {
+ And(isNotNullConstraints.reduce(And), condition)
+ } else {
+ condition
+ }
+ Filter(newCondition, child)
+
+ case join @ Join(left: LogicalPlan, right: LogicalPlan, joinType:
JoinType,
+ condition: Option[Expression]) =>
+ val leftIsNotNullConstraints = join.constraints
+ .filter(_.isInstanceOf[IsNotNull])
+ .filter(_.references.subsetOf(left.outputSet))
+ val rightIsNotNullConstraints =
+ join.constraints
+ .filter(_.isInstanceOf[IsNotNull])
+ .filter(_.references.subsetOf(right.outputSet))
+ val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
+ Filter(leftIsNotNullConstraints.reduce(And), left)
+ } else {
+ left
+ }
+ val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
+ Filter(rightIsNotNullConstraints.reduce(And), right)
+ } else {
+ right
+ }
+ Join(newLeftChild, newRightChild, joinType, condition)
+ }
+}
+
+/**
+ * Attempts to re-order the individual conjunctive predicates in a filter
condition to short circuit
+ * executing relatively cheaper checks (e.g., nullability) before others.
+ */
+object ReorderFilterPredicates extends Rule[LogicalPlan] with
PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case filter @ Filter(condition, child: LogicalPlan) =>
+ val reorderedCondition = splitConjunctivePredicates(condition)
+ .sortWith((x, _) => x.isInstanceOf[IsNotNull])
+ .reduce(And)
+ Filter(reorderedCondition, child)
+ }
+}
--- End diff --
yes, that sounds like a good idea! Could there be any other downside of not
doing it in the optimizer? /cc @nongli
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]