Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19467#discussion_r144152254
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
    @@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: 
SparkPlan) extends UnaryExecN
       }
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    child.execute().coalesce(numPartitions, shuffle = false)
    +    if (numPartitions == 1 && child.execute().getNumPartitions < 1) {
    +      // Make sure we don't output an RDD with 0 partitions, when claiming 
that we have a
    +      // `SinglePartition`.
    +      new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
    +    } else {
    +      child.execute().coalesce(numPartitions, shuffle = false)
    +    }
       }
     }
     
    +object CoalesceExec {
    +  /** A simple RDD with no data, but with the given number of partitions. 
*/
    +  class EmptyRDDWithPartitions(
    +      @transient private val sc: SparkContext,
    +      numPartitions: Int) extends RDD[InternalRow](sc, Nil) {
    +
    +    override def getPartitions: Array[Partition] =
    +      Array.tabulate(numPartitions)(i => SimplePartition(i))
    +
    +    override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
    +      Iterator.empty
    +    }
    +  }
    +
    +  case class SimplePartition(index: Int) extends Partition
    --- End diff --
    
    nit: EmptyPartition?  isnt that more descriptive than "simple"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to