Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21200#discussion_r185201032
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 ---
    @@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: 
StreamWriter, query: SparkPla
           case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
         }
     
    -    val rdd = query.execute()
    +    val rdd = new ContinuousWriteRDD(query.execute(), writerFactory)
    +    val messages = new Array[WriterCommitMessage](rdd.partitions.length)
     
         logInfo(s"Start processing data source writer: $writer. " +
    -      s"The input RDD has ${rdd.getNumPartitions} partitions.")
    -    // Let the epoch coordinator know how many partitions the write RDD 
has.
    +      s"The input RDD has ${messages.length} partitions.")
         EpochCoordinatorRef.get(
    -        
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    -        sparkContext.env)
    +      
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    +      sparkContext.env)
           .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
     
         try {
           // Force the RDD to run so continuous processing starts; no data is 
actually being collected
           // to the driver, as ContinuousWriteRDD outputs nothing.
    -      sparkContext.runJob(
    -        rdd,
    -        (context: TaskContext, iter: Iterator[InternalRow]) =>
    -          WriteToContinuousDataSourceExec.run(writerFactory, context, 
iter),
    -        rdd.partitions.indices)
    +      rdd.collect()
         } catch {
           case _: InterruptedException =>
    -        // Interruption is how continuous queries are ended, so accept and 
ignore the exception.
    +      // Interruption is how continuous queries are ended, so accept and 
ignore the exception.
           case cause: Throwable =>
    +        logError(s"Data source writer $writer is aborting.")
    --- End diff --
    
    Could you please explain the needs of additional handling? Since 
ContinuousWriteRDD is still handling the error case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to