Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21200#discussion_r185201032
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
---
@@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer:
StreamWriter, query: SparkPla
case _ => new
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
}
- val rdd = query.execute()
+ val rdd = new ContinuousWriteRDD(query.execute(), writerFactory)
+ val messages = new Array[WriterCommitMessage](rdd.partitions.length)
logInfo(s"Start processing data source writer: $writer. " +
- s"The input RDD has ${rdd.getNumPartitions} partitions.")
- // Let the epoch coordinator know how many partitions the write RDD
has.
+ s"The input RDD has ${messages.length} partitions.")
EpochCoordinatorRef.get(
-
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
- sparkContext.env)
+
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ sparkContext.env)
.askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
try {
// Force the RDD to run so continuous processing starts; no data is
actually being collected
// to the driver, as ContinuousWriteRDD outputs nothing.
- sparkContext.runJob(
- rdd,
- (context: TaskContext, iter: Iterator[InternalRow]) =>
- WriteToContinuousDataSourceExec.run(writerFactory, context,
iter),
- rdd.partitions.indices)
+ rdd.collect()
} catch {
case _: InterruptedException =>
- // Interruption is how continuous queries are ended, so accept and
ignore the exception.
+ // Interruption is how continuous queries are ended, so accept and
ignore the exception.
case cause: Throwable =>
+ logError(s"Data source writer $writer is aborting.")
--- End diff --
Could you please explain the needs of additional handling? Since
ContinuousWriteRDD is still handling the error case.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]