eyalzit commented on a change in pull request #24382: [SPARK-27330][SS] support
task abort in foreach writer
URL: https://github.com/apache/spark/pull/24382#discussion_r309121751
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
##########
@@ -141,17 +143,33 @@ class ForeachDataWriter[T](
writer.process(rowConverter(record))
} catch {
case t: Throwable =>
- writer.close(t)
+ closeWriter(t)
throw t
}
}
override def commit(): WriterCommitMessage = {
- writer.close(null)
+ closeWriter(null)
ForeachWriterCommitMessage
}
- override def abort(): Unit = {}
+ override def abort(): Unit = {
+ closeWriter(new RuntimeException("Foreach writer has been aborted"))
Review comment:
@zsxwing I agree that it will be valuable to pass the original cause to the
abort but i think that in order to due that, i will have to change the
Utils.tryWithSafeFinallyAndFailureCallbacks such that the cause block will have
the Throwable as an argument
something like:
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)
(catchBlock: Throwable => Unit = _ => Unit, finallyBlock: => Unit =
()): T
it will require some further changes wherever the catch block is used
is it make sense?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]