This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c6d3f37 [SPARK-35240][SS] Use CheckpointFileManager for checkpoint file manipulation c6d3f37 is described below commit c6d3f3778faa308308492fd758d2e9bd027f4768 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Thu May 6 00:49:37 2021 -0700 [SPARK-35240][SS] Use CheckpointFileManager for checkpoint file manipulation ### What changes were proposed in this pull request? This patch changes a few places using `FileSystem` API to manipulate checkpoint file to `CheckpointFileManager`. ### Why are the changes needed? `CheckpointFileManager` is designed to handle checkpoint file manipulation. However, there are a few places exposing `FileSystem` from checkpoint files/paths. We should use `CheckpointFileManager` to manipulate checkpoint files. For example, we may want to have one storage system for checkpoint file. If all checkpoint file manipulation is performed through `CheckpointFileManager`, we can only implement `CheckpointFileManager` for the storage system, and don't need to implement `FileSy [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. Closes #32361 from viirya/checkpoint-manager. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../execution/streaming/CheckpointFileManager.scala | 18 ++++++++++++++++++ .../sql/execution/streaming/ResolveWriteToStream.scala | 11 +++++------ .../sql/execution/streaming/StreamExecution.scala | 7 +++++-- .../spark/sql/execution/streaming/StreamMetadata.scala | 7 ++++--- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index c2b69ec..85484d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -83,6 +83,12 @@ trait CheckpointFileManager { /** Is the default file system this implementation is operating on the local file system. */ def isLocal: Boolean + + /** + * Creates the checkpoint path if it does not exist, and returns the qualified + * checkpoint path. + */ + def createCheckpointDirectory(): Path } object CheckpointFileManager extends Logging { @@ -285,6 +291,12 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration case _: LocalFileSystem | _: RawLocalFileSystem => true case _ => false } + + override def createCheckpointDirectory(): Path = { + val qualifiedPath = fs.makeQualified(path) + fs.mkdirs(qualifiedPath, FsPermission.getDirDefault) + qualifiedPath + } } @@ -351,6 +363,12 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio case _ => false } + override def createCheckpointDirectory(): Path = { + val qualifiedPath = fc.makeQualified(path) + fc.mkdir(qualifiedPath, FsPermission.getDirDefault, true) + qualifiedPath + } + private def mayRemoveCrcFile(path: Path): Unit = { try { val checksumFile = new Path(path.getParent, s".${path.getName}.crc") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala index 3e01b31..10bc927 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala @@ -89,11 +89,12 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") } } + val fileManager = CheckpointFileManager.create(new Path(checkpointLocation), s.hadoopConf) + // If offsets have already been created, we trying to resume a query. if (!s.recoverFromCheckpointLocation) { val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(s.hadoopConf) - if (fs.exists(checkpointPath)) { + if (fileManager.exists(checkpointPath)) { throw new AnalysisException( s"This query does not support recovering from checkpoint location. " + s"Delete $checkpointPath to start over.") @@ -102,7 +103,6 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointLocation) - val fs = checkpointPath.getFileSystem(s.hadoopConf) if (conf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) && StreamExecution.containsSpecialCharsInPath(checkpointPath)) { // In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString` @@ -112,7 +112,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString val legacyCheckpointDirExists = try { - fs.exists(new Path(legacyCheckpointDir)) + fileManager.exists(new Path(legacyCheckpointDir)) } catch { case NonFatal(e) => // We may not have access to this directory. Don't fail the query if that happens. @@ -139,8 +139,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { .stripMargin) } } - val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - fs.mkdirs(checkpointDir) + val checkpointDir = fileManager.createCheckpointDirectory() checkpointDir.toString } logInfo(s"Checkpoint root $checkpointLocation resolved to $resolvedCheckpointRoot.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 0a3bbc5..400906b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -237,6 +237,10 @@ abstract class StreamExecution( protected def checkpointFile(name: String): String = new Path(new Path(resolvedCheckpointRoot), name).toString + /** All checkpoint file operations should be performed through `CheckpointFileManager`. */ + private val fileManager = CheckpointFileManager.create(new Path(resolvedCheckpointRoot), + sparkSession.sessionState.newHadoopConf) + /** * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] * has been posted to all the listeners. @@ -355,8 +359,7 @@ abstract class StreamExecution( val checkpointPath = new Path(resolvedCheckpointRoot) try { logInfo(s"Deleting checkpoint $checkpointPath.") - val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - fs.delete(checkpointPath, true) + fileManager.delete(checkpointPath) } catch { case NonFatal(e) => // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index fc0cfc3..a4bcc5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -49,11 +49,12 @@ object StreamMetadata extends Logging { /** Read the metadata from file if it exists */ def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { - val fs = metadataFile.getFileSystem(hadoopConf) - if (fs.exists(metadataFile)) { + val fileManager = CheckpointFileManager.create(metadataFile.getParent, hadoopConf) + + if (fileManager.exists(metadataFile)) { var input: FSDataInputStream = null try { - input = fs.open(metadataFile) + input = fileManager.open(metadataFile) val reader = new InputStreamReader(input, StandardCharsets.UTF_8) val metadata = Serialization.read[StreamMetadata](reader) Some(metadata) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org