dtenedor commented on code in PR #54236:
URL: https://github.com/apache/spark/pull/54236#discussion_r2805961129


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1811,24 +1828,41 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan]
  */
 object CombineUnions extends Rule[LogicalPlan] {
   import CollapseProject.{buildCleanedProjectList, canCollapseExpressions}
-  import PushProjectionThroughUnion.{canPushProjectionThroughUnion, 
pushProjectionThroughUnion}
+  import PushProjectionThroughUnion.{canPushProjectionThroughUnion, 
pushProjectionThroughUnionLike}
 
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformDownWithPruning(
-    _.containsAnyPattern(UNION, DISTINCT_LIKE), ruleId) {
-    case u: Union => flattenUnion(u, false)
-    case Distinct(u: Union) => Distinct(flattenUnion(u, true))
+    _.containsAnyPattern(UNION, SEQUENTIAL_STREAMING_UNION, DISTINCT_LIKE), 
ruleId) {
+    // Flatten any UnionLike (Union, SequentialStreamingUnion)
+    // This is safe because flattening preserves child ordering.
+    case u: UnionLike => flattenUnionLike(u, false)
+    case Distinct(u: UnionLike) => Distinct(flattenUnionLike(u, true))
     // Only handle distinct-like 'Deduplicate', where the keys == output
-    case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) == 
u.outputSet =>
-      Deduplicate(keys, flattenUnion(u, true))
-    case DeduplicateWithinWatermark(keys: Seq[Attribute], u: Union)
+    case Deduplicate(keys: Seq[Attribute], u: UnionLike) if AttributeSet(keys) 
== u.outputSet =>
+      Deduplicate(keys, flattenUnionLike(u, true))
+    case DeduplicateWithinWatermark(keys: Seq[Attribute], u: UnionLike)
       if AttributeSet(keys) == u.outputSet =>
-      DeduplicateWithinWatermark(keys, flattenUnion(u, true))
+      DeduplicateWithinWatermark(keys, flattenUnionLike(u, true))
   }
 
-  private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = {
+  /**
+   * Flattens nested UnionLike operators (Union, SequentialStreamingUnion) 
into a single operator.
+   * This transformation is order-preserving: children maintain their relative 
ordering in the
+   * flattened sequence, which is critical for SequentialStreamingUnion where 
order is semantically
+   * significant.
+   *
+   * @param union The UnionLike operator to flatten
+   * @param flattenDistinct Whether to flatten through Distinct/Deduplicate 
operators
+   * @return The flattened UnionLike operator with the same concrete type as 
the input
+   */
+  private def flattenUnionLike(union: UnionLike, flattenDistinct: Boolean): 
UnionLike = {

Review Comment:
   I think we can reduce diffs from this PR by skipping the method renames from 
*Union to *UnionLike. We can still update their arguments and/or return values 
to return UnionLike where needed. This way the intention still remains clear 
while reducing diffs here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to