Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19467#discussion_r144152859
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
---
@@ -131,17 +132,17 @@ class IncrementalExecution(
}
override def preparations: Seq[Rule[SparkPlan]] =
- Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations
+ Seq(state,
EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++
super.preparations
/** No need assert supported, as this check has already been done */
override def assertSupported(): Unit = { }
}
-object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+case class EnsureStatefulOpPartitioning(conf: SQLConf) extends
Rule[SparkPlan] {
// Needs to be transformUp to avoid extra shuffles
override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
case so: StatefulOperator =>
- val numPartitions =
plan.sqlContext.sessionState.conf.numShufflePartitions
+ val numPartitions = conf.numShufflePartitions
--- End diff --
why this change?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]