Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19196#discussion_r138771336
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
---
@@ -117,8 +119,33 @@ class IncrementalExecution(
}
}
- override def preparations: Seq[Rule[SparkPlan]] = state +:
super.preparations
+ override def preparations: Seq[Rule[SparkPlan]] = Seq(
+ state,
+ EnsureStatefulOpPartitioning) ++ super.preparations
/** No need assert supported, as this check has already been done */
override def assertSupported(): Unit = { }
}
+
+object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+ // Needs to be transformUp to avoid extra shuffles
+ override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+ case ss: StatefulOperator =>
+ val numPartitions =
plan.sqlContext.sessionState.conf.numShufflePartitions
+ val keys = ss.keyExpressions
--- End diff --
I think that is a better idea.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]