dtenedor commented on code in PR #54098:
URL: https://github.com/apache/spark/pull/54098#discussion_r2771444624
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -633,6 +633,86 @@ case class Union(
copy(children = newChildren)
}
+/**
+ * Logical plan for unioning multiple plans sequentially, processing each
child to completion
+ * before moving to the next. This is used for backfill-to-live streaming
scenarios where
+ * historical data should be processed completely before switching to live
data.
+ *
+ * Unlike [[Union]] which processes all children concurrently in streaming
queries, SequentialUnion
+ * processes each child source sequentially:
+ * 1. First child processes until complete (bounded sources reach their end)
+ * 2. Second child begins processing
+ * 3. And so on...
+ *
+ * Requirements:
+ * - Minimum 2 children required
+ * - All children must be streaming sources
+ * - All non-final children must support bounded execution
(SupportsTriggerAvailableNow)
+ * - All children must have explicit names when used in streaming queries
+ * - Children cannot contain stateful operations (aggregations, joins, etc.)
+ * - Schema compatibility is enforced via UnionBase
+ *
+ * State preservation: Stateful operators applied AFTER SequentialUnion
(aggregations,
+ * watermarks, deduplication, joins) preserve their state across source
transitions,
+ * enabling seamless backfill-to-live scenarios.
+ *
+ * Example:
+ * {{{
+ * val historical =
spark.readStream.format("delta").name("historical").load("/data")
+ * val live = spark.readStream.format("kafka").name("live").load()
+ * // Correct: stateful operations after SequentialUnion
+ * historical.followedBy(live).groupBy("key").count()
+ *
+ * // Incorrect: stateful operations before SequentialUnion
+ * //
historical.groupBy("key").count().followedBy(live.groupBy("key").count()) //
Not allowed
+ * }}}
+ *
+ * @param children The logical plans to union sequentially (must be
streaming sources)
+ * @param byName Whether to resolve columns by name
+ * @param allowMissingCol Whether to allow missing columns in children
Review Comment:
what does this mean?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -633,6 +633,86 @@ case class Union(
copy(children = newChildren)
}
+/**
+ * Logical plan for unioning multiple plans sequentially, processing each
child to completion
+ * before moving to the next. This is used for backfill-to-live streaming
scenarios where
+ * historical data should be processed completely before switching to live
data.
+ *
+ * Unlike [[Union]] which processes all children concurrently in streaming
queries, SequentialUnion
+ * processes each child source sequentially:
+ * 1. First child processes until complete (bounded sources reach their end)
+ * 2. Second child begins processing
+ * 3. And so on...
+ *
+ * Requirements:
+ * - Minimum 2 children required
+ * - All children must be streaming sources
+ * - All non-final children must support bounded execution
(SupportsTriggerAvailableNow)
+ * - All children must have explicit names when used in streaming queries
+ * - Children cannot contain stateful operations (aggregations, joins, etc.)
+ * - Schema compatibility is enforced via UnionBase
+ *
+ * State preservation: Stateful operators applied AFTER SequentialUnion
(aggregations,
+ * watermarks, deduplication, joins) preserve their state across source
transitions,
+ * enabling seamless backfill-to-live scenarios.
+ *
+ * Example:
+ * {{{
+ * val historical =
spark.readStream.format("delta").name("historical").load("/data")
+ * val live = spark.readStream.format("kafka").name("live").load()
+ * // Correct: stateful operations after SequentialUnion
+ * historical.followedBy(live).groupBy("key").count()
+ *
+ * // Incorrect: stateful operations before SequentialUnion
+ * //
historical.groupBy("key").count().followedBy(live.groupBy("key").count()) //
Not allowed
+ * }}}
+ *
+ * @param children The logical plans to union sequentially (must be
streaming sources)
+ * @param byName Whether to resolve columns by name
+ * @param allowMissingCol Whether to allow missing columns in children
+ */
+case class SequentialUnion(
+ children: Seq[LogicalPlan],
+ byName: Boolean = false,
Review Comment:
let's make all these params required in order to make constructors think
about what their correct values should be; it is a safer practice.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -633,6 +633,86 @@ case class Union(
copy(children = newChildren)
}
+/**
+ * Logical plan for unioning multiple plans sequentially, processing each
child to completion
+ * before moving to the next. This is used for backfill-to-live streaming
scenarios where
+ * historical data should be processed completely before switching to live
data.
+ *
+ * Unlike [[Union]] which processes all children concurrently in streaming
queries, SequentialUnion
+ * processes each child source sequentially:
+ * 1. First child processes until complete (bounded sources reach their end)
+ * 2. Second child begins processing
+ * 3. And so on...
+ *
+ * Requirements:
+ * - Minimum 2 children required
+ * - All children must be streaming sources
+ * - All non-final children must support bounded execution
(SupportsTriggerAvailableNow)
+ * - All children must have explicit names when used in streaming queries
+ * - Children cannot contain stateful operations (aggregations, joins, etc.)
+ * - Schema compatibility is enforced via UnionBase
+ *
+ * State preservation: Stateful operators applied AFTER SequentialUnion
(aggregations,
+ * watermarks, deduplication, joins) preserve their state across source
transitions,
+ * enabling seamless backfill-to-live scenarios.
+ *
+ * Example:
+ * {{{
+ * val historical =
spark.readStream.format("delta").name("historical").load("/data")
+ * val live = spark.readStream.format("kafka").name("live").load()
+ * // Correct: stateful operations after SequentialUnion
+ * historical.followedBy(live).groupBy("key").count()
+ *
+ * // Incorrect: stateful operations before SequentialUnion
+ * //
historical.groupBy("key").count().followedBy(live.groupBy("key").count()) //
Not allowed
+ * }}}
+ *
+ * @param children The logical plans to union sequentially (must be
streaming sources)
+ * @param byName Whether to resolve columns by name
+ * @param allowMissingCol Whether to allow missing columns in children
+ */
+case class SequentialUnion(
Review Comment:
Can we put this in a separate file to improve the code health, since this
file is already quite large?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -633,6 +633,86 @@ case class Union(
copy(children = newChildren)
}
+/**
+ * Logical plan for unioning multiple plans sequentially, processing each
child to completion
+ * before moving to the next. This is used for backfill-to-live streaming
scenarios where
+ * historical data should be processed completely before switching to live
data.
+ *
+ * Unlike [[Union]] which processes all children concurrently in streaming
queries, SequentialUnion
+ * processes each child source sequentially:
+ * 1. First child processes until complete (bounded sources reach their end)
+ * 2. Second child begins processing
+ * 3. And so on...
+ *
+ * Requirements:
+ * - Minimum 2 children required
+ * - All children must be streaming sources
Review Comment:
optional: should we put this in a streaming package and/or name it
SequentialStreamingUnion to emphasize this constraint?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -633,6 +633,86 @@ case class Union(
copy(children = newChildren)
}
+/**
+ * Logical plan for unioning multiple plans sequentially, processing each
child to completion
+ * before moving to the next. This is used for backfill-to-live streaming
scenarios where
+ * historical data should be processed completely before switching to live
data.
+ *
+ * Unlike [[Union]] which processes all children concurrently in streaming
queries, SequentialUnion
+ * processes each child source sequentially:
+ * 1. First child processes until complete (bounded sources reach their end)
+ * 2. Second child begins processing
+ * 3. And so on...
+ *
+ * Requirements:
+ * - Minimum 2 children required
+ * - All children must be streaming sources
+ * - All non-final children must support bounded execution
(SupportsTriggerAvailableNow)
Review Comment:
what does "non-final" children mean?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
SequentialUnion}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * Flattens nested SequentialUnion nodes into a single level.
+ * This allows chaining: df1.followedBy(df2).followedBy(df3)
+ */
+object FlattenSequentialUnion extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
+ _.containsPattern(TreePattern.UNION)) {
+ case SequentialUnion(children, byName, allowMissingCol) =>
+ val flattened = SequentialUnion.flatten(children)
+ SequentialUnion(flattened, byName, allowMissingCol)
+ }
+}
+
+/**
+ * Validates SequentialUnion constraints:
+ * - Minimum 2 children
+ * - All children must be streaming relations
+ * - No nested SequentialUnions (should be flattened first)
+ * - No stateful operations in any child subtrees
+ */
+object ValidateSequentialUnion extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.foreach {
+ case su: SequentialUnion =>
+ validateMinimumChildren(su)
+ validateAllStreaming(su)
+ validateNoNesting(su)
+ validateNoStatefulDescendants(su)
+ case _ =>
+ }
+ plan
+ }
+
+ private def validateMinimumChildren(su: SequentialUnion): Unit = {
+ if (su.children.length < 2) {
+ throw QueryCompilationErrors.invalidNumberOfChildrenForUnionError(
+ su.getClass.getSimpleName, su.children.length, 2)
+ }
+ }
+
+ private def validateAllStreaming(su: SequentialUnion): Unit = {
+ val nonStreamingChildren = su.children.filterNot(_.isStreaming)
+ if (nonStreamingChildren.nonEmpty) {
+ throw QueryCompilationErrors.notStreamingDatasetError("SequentialUnion")
+ }
+ }
+
+ private def validateNoNesting(su: SequentialUnion): Unit = {
+ su.children.foreach { child =>
+ if (child.exists(_.isInstanceOf[SequentialUnion])) {
Review Comment:
what about if there is another SequentialUnion that is not a direct child,
but otherwise a descendant?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -633,6 +633,86 @@ case class Union(
copy(children = newChildren)
}
+/**
+ * Logical plan for unioning multiple plans sequentially, processing each
child to completion
+ * before moving to the next. This is used for backfill-to-live streaming
scenarios where
+ * historical data should be processed completely before switching to live
data.
Review Comment:
Can we please mention here the intention is for this to act as a logical
placeholder and we will match against it and come up with specific execution
steps for streaming queries, instead of making a corresponding SequentialUnion
physical/execution operator?
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -4710,6 +4716,12 @@
],
"sqlState" : "07501"
},
+ "NESTED_SEQUENTIAL_UNION" : {
+ "message" : [
+ "Nested SequentialUnion is not supported. <hint>"
Review Comment:
Same, can we mention here what SQL or DF syntax caused this, and recommend
what the user should do to make their SQL or DF query work again next time?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -633,6 +633,86 @@ case class Union(
copy(children = newChildren)
}
+/**
+ * Logical plan for unioning multiple plans sequentially, processing each
child to completion
+ * before moving to the next. This is used for backfill-to-live streaming
scenarios where
+ * historical data should be processed completely before switching to live
data.
+ *
+ * Unlike [[Union]] which processes all children concurrently in streaming
queries, SequentialUnion
+ * processes each child source sequentially:
+ * 1. First child processes until complete (bounded sources reach their end)
+ * 2. Second child begins processing
+ * 3. And so on...
+ *
+ * Requirements:
+ * - Minimum 2 children required
+ * - All children must be streaming sources
+ * - All non-final children must support bounded execution
(SupportsTriggerAvailableNow)
+ * - All children must have explicit names when used in streaming queries
+ * - Children cannot contain stateful operations (aggregations, joins, etc.)
+ * - Schema compatibility is enforced via UnionBase
+ *
+ * State preservation: Stateful operators applied AFTER SequentialUnion
(aggregations,
+ * watermarks, deduplication, joins) preserve their state across source
transitions,
+ * enabling seamless backfill-to-live scenarios.
+ *
+ * Example:
+ * {{{
+ * val historical =
spark.readStream.format("delta").name("historical").load("/data")
+ * val live = spark.readStream.format("kafka").name("live").load()
+ * // Correct: stateful operations after SequentialUnion
+ * historical.followedBy(live).groupBy("key").count()
+ *
+ * // Incorrect: stateful operations before SequentialUnion
+ * //
historical.groupBy("key").count().followedBy(live.groupBy("key").count()) //
Not allowed
+ * }}}
+ *
+ * @param children The logical plans to union sequentially (must be
streaming sources)
+ * @param byName Whether to resolve columns by name
+ * @param allowMissingCol Whether to allow missing columns in children
+ */
+case class SequentialUnion(
+ children: Seq[LogicalPlan],
+ byName: Boolean = false,
+ allowMissingCol: Boolean = false) extends UnionBase {
+ assert(!allowMissingCol || byName,
+ "`allowMissingCol` can be true only if `byName` is true.")
+
+ final override val nodePatterns: Seq[TreePattern] = Seq(UNION)
+
+ override lazy val resolved: Boolean = {
+ children.length >= 2 &&
+ !(byName || allowMissingCol) &&
Review Comment:
this is a bit complex; can you add a comment explaining this 'resolved'
logic?
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3531,6 +3531,12 @@
],
"sqlState" : "42K0E"
},
+ "INVALID_NUMBER_OF_CHILDREN_FOR_UNION" : {
+ "message" : [
+ "<operator> requires at least <minimum> children, but only <actual> were
provided."
Review Comment:
Can we extend this to recommend what the user should do to make their SQL or
DF query work again next time? Same for all new errors below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]