cloud-fan commented on code in PR #53664:
URL: https://github.com/apache/spark/pull/53664#discussion_r2721350517


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -80,19 +80,55 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] 
{
  * Computes expressions in inline tables. This rule is supposed to be called 
at the very end
  * of the analysis phase, given that all the expressions need to be fully 
resolved/replaced
  * at this point.
+ *
+ * Note: Inline tables with outer references (correlated) are NOT evaluated 
here - they will
+ * be rewritten by RewriteCorrelatedInlineTable and handled by the 
decorrelation framework.
+ * If spark.sql.legacy.valuesGeneralizedExpressionsEnabled is false, we'll 
never see such
+ * tables here (they're rejected during analysis).
  */
 object EvalInlineTables extends Rule[LogicalPlan] with CastSupport {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(INLINE_TABLE_EVAL))
 {
-      case table: ResolvedInlineTable => eval(table)
+      case table: ResolvedInlineTable =>
+        // Check if table has outer references (correlated expressions)
+        // Note: If config is disabled, we never reach here with outer refs
+        val hasOuterRefs = 
table.rows.flatten.exists(SubExprUtils.containsOuter)
+        if (hasOuterRefs) {
+          // Keep as ResolvedInlineTable - will be rewritten by 
RewriteCorrelatedInlineTable
+          table
+        } else {
+          // Safe to evaluate
+          eval(table)
+        }
     }
   }
 
-    def eval(table: ResolvedInlineTable): LocalRelation = {
+    /**
+   * Evaluates the expressions in a [[ResolvedInlineTable]] and produces a 
[[LocalRelation]].
+   *
+   * Non-deterministic expressions are initialized with partition index 0 
(driver side)
+   * before evaluation to ensure consistent random number generation.
+   */
+  /**
+   * Evaluates the expressions in a [[ResolvedInlineTable]] and produces a 
[[LocalRelation]].
+   *
+   * Non-deterministic expressions are initialized with partition index 0 
(driver side)
+   * before evaluation to ensure consistent random number generation.
+   */
+  def eval(table: ResolvedInlineTable): LocalRelation = {
       val newRows: Seq[InternalRow] =
         table.rows.map { row => InternalRow.fromSeq(row.map { e =>
           try {
-            prepareForEval(e).eval()
+            val prepared = prepareForEval(e)
+            // Initialize nondeterministic expressions before evaluation
+            prepared match {
+              case n: Nondeterministic =>

Review Comment:
   This seems like a separated bug fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to