Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185201032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } - val rdd = query.execute() + val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) + val messages = new Array[WriterCommitMessage](rdd.partitions.length) logInfo(s"Start processing data source writer: $writer. " + - s"The input RDD has ${rdd.getNumPartitions} partitions.") - // Let the epoch coordinator know how many partitions the write RDD has. + s"The input RDD has ${messages.length} partitions.") EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - sparkContext.env) + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - sparkContext.runJob( - rdd, - (context: TaskContext, iter: Iterator[InternalRow]) => - WriteToContinuousDataSourceExec.run(writerFactory, context, iter), - rdd.partitions.indices) + rdd.collect() } catch { case _: InterruptedException => - // Interruption is how continuous queries are ended, so accept and ignore the exception. + // Interruption is how continuous queries are ended, so accept and ignore the exception. case cause: Throwable => + logError(s"Data source writer $writer is aborting.") --- End diff -- Could you please explain the needs of additional handling? Since ContinuousWriteRDD is still handling the error case.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org