Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19196#discussion_r138759679
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
---
@@ -117,8 +119,33 @@ class IncrementalExecution(
}
}
- override def preparations: Seq[Rule[SparkPlan]] = state +:
super.preparations
+ override def preparations: Seq[Rule[SparkPlan]] = Seq(
+ state,
+ EnsureStatefulOpPartitioning) ++ super.preparations
/** No need assert supported, as this check has already been done */
override def assertSupported(): Unit = { }
}
+
+object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+ // Needs to be transformUp to avoid extra shuffles
+ override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+ case ss: StatefulOperator =>
--- End diff --
nit: why `ss`? how about `so` or `op` or `stateOp`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]