This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c6d3f37  [SPARK-35240][SS] Use CheckpointFileManager for checkpoint 
file manipulation
c6d3f37 is described below

commit c6d3f3778faa308308492fd758d2e9bd027f4768
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Thu May 6 00:49:37 2021 -0700

    [SPARK-35240][SS] Use CheckpointFileManager for checkpoint file manipulation
    
    ### What changes were proposed in this pull request?
    
    This patch changes a few places using `FileSystem` API to manipulate 
checkpoint file to `CheckpointFileManager`.
    
    ### Why are the changes needed?
    
    `CheckpointFileManager` is designed to handle checkpoint file manipulation. 
However, there are a few places exposing `FileSystem` from checkpoint 
files/paths. We should use `CheckpointFileManager` to manipulate checkpoint 
files. For example, we may want to have one storage system for checkpoint file. 
If all checkpoint file manipulation is performed through 
`CheckpointFileManager`, we can only implement `CheckpointFileManager` for the 
storage system, and don't need to implement `FileSy [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    Closes #32361 from viirya/checkpoint-manager.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../execution/streaming/CheckpointFileManager.scala    | 18 ++++++++++++++++++
 .../sql/execution/streaming/ResolveWriteToStream.scala | 11 +++++------
 .../sql/execution/streaming/StreamExecution.scala      |  7 +++++--
 .../spark/sql/execution/streaming/StreamMetadata.scala |  7 ++++---
 4 files changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index c2b69ec..85484d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -83,6 +83,12 @@ trait CheckpointFileManager {
 
   /** Is the default file system this implementation is operating on the local 
file system. */
   def isLocal: Boolean
+
+  /**
+   * Creates the checkpoint path if it does not exist, and returns the 
qualified
+   * checkpoint path.
+   */
+  def createCheckpointDirectory(): Path
 }
 
 object CheckpointFileManager extends Logging {
@@ -285,6 +291,12 @@ class FileSystemBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuration
     case _: LocalFileSystem | _: RawLocalFileSystem => true
     case _ => false
   }
+
+  override def createCheckpointDirectory(): Path = {
+    val qualifiedPath = fs.makeQualified(path)
+    fs.mkdirs(qualifiedPath, FsPermission.getDirDefault)
+    qualifiedPath
+  }
 }
 
 
@@ -351,6 +363,12 @@ class FileContextBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuratio
     case _ => false
   }
 
+  override def createCheckpointDirectory(): Path = {
+    val qualifiedPath = fc.makeQualified(path)
+    fc.mkdir(qualifiedPath, FsPermission.getDirDefault, true)
+    qualifiedPath
+  }
+
   private def mayRemoveCrcFile(path: Path): Unit = {
     try {
       val checksumFile = new Path(path.getParent, s".${path.getName}.crc")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
index 3e01b31..10bc927 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
@@ -89,11 +89,12 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with 
SQLConfHelper {
             s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", 
...)""")
       }
     }
+    val fileManager = CheckpointFileManager.create(new 
Path(checkpointLocation), s.hadoopConf)
+
     // If offsets have already been created, we trying to resume a query.
     if (!s.recoverFromCheckpointLocation) {
       val checkpointPath = new Path(checkpointLocation, "offsets")
-      val fs = checkpointPath.getFileSystem(s.hadoopConf)
-      if (fs.exists(checkpointPath)) {
+      if (fileManager.exists(checkpointPath)) {
         throw new AnalysisException(
           s"This query does not support recovering from checkpoint location. " 
+
             s"Delete $checkpointPath to start over.")
@@ -102,7 +103,6 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with 
SQLConfHelper {
 
     val resolvedCheckpointRoot = {
       val checkpointPath = new Path(checkpointLocation)
-      val fs = checkpointPath.getFileSystem(s.hadoopConf)
       if (conf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
         && StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
         // In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 
`Path.toUri.toString`
@@ -112,7 +112,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with 
SQLConfHelper {
         new Path(new 
Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
         val legacyCheckpointDirExists =
           try {
-            fs.exists(new Path(legacyCheckpointDir))
+            fileManager.exists(new Path(legacyCheckpointDir))
           } catch {
             case NonFatal(e) =>
               // We may not have access to this directory. Don't fail the 
query if that happens.
@@ -139,8 +139,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with 
SQLConfHelper {
               .stripMargin)
         }
       }
-      val checkpointDir = checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-      fs.mkdirs(checkpointDir)
+      val checkpointDir = fileManager.createCheckpointDirectory()
       checkpointDir.toString
     }
     logInfo(s"Checkpoint root $checkpointLocation resolved to 
$resolvedCheckpointRoot.")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 0a3bbc5..400906b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -237,6 +237,10 @@ abstract class StreamExecution(
   protected def checkpointFile(name: String): String =
     new Path(new Path(resolvedCheckpointRoot), name).toString
 
+  /** All checkpoint file operations should be performed through 
`CheckpointFileManager`. */
+  private val fileManager = CheckpointFileManager.create(new 
Path(resolvedCheckpointRoot),
+      sparkSession.sessionState.newHadoopConf)
+
   /**
    * Starts the execution. This returns only after the thread has started and 
[[QueryStartedEvent]]
    * has been posted to all the listeners.
@@ -355,8 +359,7 @@ abstract class StreamExecution(
           val checkpointPath = new Path(resolvedCheckpointRoot)
           try {
             logInfo(s"Deleting checkpoint $checkpointPath.")
-            val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
-            fs.delete(checkpointPath, true)
+            fileManager.delete(checkpointPath)
           } catch {
             case NonFatal(e) =>
               // Deleting temp checkpoint folder is best effort, don't throw 
non fatal exceptions
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index fc0cfc3..a4bcc5f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -49,11 +49,12 @@ object StreamMetadata extends Logging {
 
   /** Read the metadata from file if it exists */
   def read(metadataFile: Path, hadoopConf: Configuration): 
Option[StreamMetadata] = {
-    val fs = metadataFile.getFileSystem(hadoopConf)
-    if (fs.exists(metadataFile)) {
+    val fileManager = CheckpointFileManager.create(metadataFile.getParent, 
hadoopConf)
+
+    if (fileManager.exists(metadataFile)) {
       var input: FSDataInputStream = null
       try {
-        input = fs.open(metadataFile)
+        input = fileManager.open(metadataFile)
         val reader = new InputStreamReader(input, StandardCharsets.UTF_8)
         val metadata = Serialization.read[StreamMetadata](reader)
         Some(metadata)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to