HeartSaVioR commented on a change in pull request #25333: [SPARK-28597][SS] Add
config to retry spark streaming's meta log when it met error
URL: https://github.com/apache/spark/pull/25333#discussion_r312400729
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
##########
@@ -114,27 +116,42 @@ class HDFSMetadataLog[T <: AnyRef :
ClassTag](sparkSession: SparkSession, path:
}
}
- /** Write a batch to a temp file then rename it to the batch file.
+ private def writeBatchToFile(metadata: T, path: Path): Unit = {
+ writeBatchToFileWithRetries(metadata, path)
+ }
+
+ /**
+ * Write a batch to a temp file then rename it to the batch file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path.
Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
- private def writeBatchToFile(metadata: T, path: Path): Unit = {
- val output = fileManager.createAtomic(path, overwriteIfPossible = false)
- try {
- serialize(metadata, output)
- output.close()
- } catch {
- case e: FileAlreadyExistsException =>
- output.cancel()
- // If next batch file already exists, then another concurrently
running query has
- // written it.
- throw new ConcurrentModificationException(
- s"Multiple streaming queries are concurrently using $path", e)
- case e: Throwable =>
- output.cancel()
- throw e
- }
+ private def writeBatchToFileWithRetries(metadata: T, path: Path): Unit = {
+ var retryCount = 0
+ var isSuccess = false
+ do {
+ val output = fileManager.createAtomic(path, overwriteIfPossible = false)
+ try {
+ serialize(metadata, output)
+ output.close()
+ isSuccess = true
+ } catch {
+ case e: FileAlreadyExistsException =>
+ output.cancel()
+ // If next batch file already exists, then another concurrently
running query has
+ // written it.
+ throw new ConcurrentModificationException(
+ s"Multiple streaming queries are concurrently using $path", e)
+ case e: Throwable =>
+ output.cancel()
+ if (retryCount >= metaDataNumRetries) {
+ throw new IOException(
Review comment:
If we expect IOException (and its descendant) from calling HDFS API, I'd
rather not catching Throwable here. The narrower the better. As you're catching
Throwable here you're catching Error as well, which is unrecoverable and have
to just let it crash.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]