Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19467#discussion_r144152254
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
---
@@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child:
SparkPlan) extends UnaryExecN
}
protected override def doExecute(): RDD[InternalRow] = {
- child.execute().coalesce(numPartitions, shuffle = false)
+ if (numPartitions == 1 && child.execute().getNumPartitions < 1) {
+ // Make sure we don't output an RDD with 0 partitions, when claiming
that we have a
+ // `SinglePartition`.
+ new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
+ } else {
+ child.execute().coalesce(numPartitions, shuffle = false)
+ }
}
}
+object CoalesceExec {
+ /** A simple RDD with no data, but with the given number of partitions.
*/
+ class EmptyRDDWithPartitions(
+ @transient private val sc: SparkContext,
+ numPartitions: Int) extends RDD[InternalRow](sc, Nil) {
+
+ override def getPartitions: Array[Partition] =
+ Array.tabulate(numPartitions)(i => SimplePartition(i))
+
+ override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
+ Iterator.empty
+ }
+ }
+
+ case class SimplePartition(index: Int) extends Partition
--- End diff --
nit: EmptyPartition? isnt that more descriptive than "simple"
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]