cloud-fan commented on code in PR #56499:
URL: https://github.com/apache/spark/pull/56499#discussion_r3433001827


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala:
##########
@@ -77,6 +77,27 @@ trait ConstraintHelper {
         inferredConstraints ++= replaceConstraints(predicates - eq - 
EqualNullSafe(l, r), l, r)
       case _ => // No inference
     }
+
+    // Second pass: substitute (attr = literal) bindings into range/inequality 
predicates.
+    // When a.pt = '20260610' and b.pt >= f(a.pt) are both in the constraint 
set,
+    // substituting yields b.pt >= f('20260610'), which ConstantFolding then 
reduces to a
+    // literal comparison that can be pushed into the right-side scan as a 
partition filter.
+    val attrToLiteral: Map[Attribute, Literal] = predicates.collect {
+      case EqualTo(a: Attribute, l: Literal) => a -> l
+      case EqualTo(l: Literal, a: Attribute) => a -> l
+    }.toMap
+
+    if (attrToLiteral.nonEmpty) {
+      predicates.foreach {
+        case _: EqualTo => // already handled above
+        case pred if pred.deterministic && 
pred.references.exists(attrToLiteral.contains) =>
+          inferredConstraints += pred.transform {
+            case a: Attribute if attrToLiteral.contains(a) => attrToLiteral(a)

Review Comment:
   This substitution has no collation guard, so it can produce wrong results 
for collated string columns: `a = lit` does not imply `binary(a) = binary(lit)` 
under a non-binary-stable (case/accent-insensitive) collation, so replacing `a` 
inside a comparison that re-collates it changes the result.
   
   Concretely, with `a.c STRING COLLATE UTF8_LCASE`, `WHERE a.c = 'hello'`, and 
a join condition `b.c COLLATE UTF8_BINARY >= a.c COLLATE UTF8_BINARY`, this 
infers the pushed filter `b.c COLLATE UTF8_BINARY >= 'hello'` — which drops 
`b.c = 'HZ'` even though it legitimately joins with `a.c = 'HELLO'` (`'HZ' >= 
'HELLO'` is true in binary, but `'HZ' >= 'hello'` is false).
   
   `ConstantPropagation` guards exactly this with `isBinaryStable(a.dataType)` 
(expressions.scala:233). Suggest skipping the substitution unless 
`isBinaryStable(attr.dataType)`, and adding a collated-column test (it should 
fail today and pass once guarded).



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala:
##########
@@ -399,4 +400,52 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
     comparePlans(optimizedQuery, correctAnswer)
     comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer)
   }
+
+  test("correlated range predicate in left-outer join ON clause is pushed to 
right side " +
+    "when left attr is bound to a literal") {
+    testRangePredicatePushDown(LeftOuter)
+  }
+
+  test("correlated range predicate in inner join ON clause is pushed to right 
side " +
+    "when left attr is bound to a literal") {
+    testRangePredicatePushDown(Inner)
+  }
+
+  private def testRangePredicatePushDown(joinType: JoinType): Unit = {
+    // Simulates: SELECT * FROM a JOIN b ON a.key = b.key AND b.v >= a.k AND 
b.v <= a.k + 10
+    // WHERE a.k = 5
+    // The constant binding a.k = 5 should be substituted into the range 
predicates, producing
+    // b.v >= 5 and b.v <= 15 that get pushed into the right scan.
+    val left = LocalRelation($"k".int, $"key".int).subquery("a")
+    val right = LocalRelation($"v".int, $"key".int).subquery("b")
+
+    val joinCond = ("a.key".attr === "b.key".attr) &&
+      ("b.v".attr >= "a.k".attr) &&
+      ("b.v".attr <= ("a.k".attr + 10))
+
+    val originalQuery = left
+      .where("a.k".attr === 5)
+      .join(right, joinType, Some(joinCond))
+      .analyze
+
+    val optimized = Optimize.execute(originalQuery)
+
+    // The optimized right side must carry the inferred range filters.
+    optimized.foreach {
+      case Join(_, rightChild, `joinType`, _, _) =>

Review Comment:
   If `optimized` contains no `Join` node, the `case _ =>` swallows every node 
and this block asserts nothing, so the test passes vacuously. Consider 
asserting that a `Join` was actually matched, so a future change that 
restructures the plan can't silently neuter the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to