Repository: spark
Updated Branches:
  refs/heads/master 558f31b31 -> cbb41a0c5


[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common 
CheckpointFileManager interface

## What changes were proposed in this pull request?

Checkpoint files (offset log files, state store files) in Structured Streaming 
must be written atomically such that no partial files are generated (would 
break fault-tolerance guarantees). Currently, there are 3 locations which try 
to do this individually, and in some cases, incorrectly.

1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any 
implementation of `FileSystem` or `FileContext` APIs. It preferably loads 
`FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
  - Writing a version.delta file - This uses FileSystem APIs only to perform a 
rename. This is incorrect as rename is not atomic in HDFS FileSystem 
implementation.
  - Writing a snapshot file - Same as above.

#### Current problems:
1. State Store behavior is incorrect - HDFS FileSystem implementation does not 
have atomic rename.
1. Inflexible - Some file systems provide mechanisms other than 
write-to-temp-file-and-rename for writing atomically and more efficiently. For 
example, with S3 you can write directly to the final file and it will be made 
visible only when the entire file is written and closed correctly. Any failure 
can be made to terminate the writing without making any partial files visible 
in S3. The current code does not abstract out this mechanism enough that it can 
be customized.

#### Solution:

1. Introduce a common interface that all 3 cases above can use to write 
checkpoint files atomically.
2. This interface must provide the necessary interfaces that allow 
customization of the write-and-rename mechanism.

This PR does that by introducing the interface `CheckpointFileManager` and 
modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. 
Similar to earlier `FileManager`, there are implementations based on 
`FileSystem` and `FileContext` APIs, and the latter implementation is preferred 
to make it work correctly with HDFS.

The key method this interface has is `createAtomic(path, overwrite)` which 
returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All 
users of this method need to either call `close()` to successfully write the 
file, or `cancel()` in case of an error.

## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #21048 from tdas/SPARK-23966.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbb41a0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbb41a0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbb41a0c

Branch: refs/heads/master
Commit: cbb41a0c5b01579c85f06ef42cc0585fbef216c5
Parents: 558f31b
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Apr 13 16:31:39 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Apr 13 16:31:39 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   7 +
 .../streaming/CheckpointFileManager.scala       | 349 +++++++++++++++++++
 .../execution/streaming/HDFSMetadataLog.scala   | 229 +-----------
 .../state/HDFSBackedStateStoreProvider.scala    | 120 +++----
 .../execution/streaming/state/StateStore.scala  |   4 +-
 .../streaming/CheckpointFileManagerSuite.scala  | 192 ++++++++++
 .../CompactibleFileStreamLogSuite.scala         |   5 -
 .../streaming/HDFSMetadataLogSuite.scala        | 116 +-----
 .../streaming/state/StateStoreSuite.scala       |  58 ++-
 9 files changed, 678 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1c8ab9c..0dc47bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -930,6 +930,13 @@ object SQLConf {
       .intConf
       .createWithDefault(100)
 
+  val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS =
+    buildConf("spark.sql.streaming.checkpointFileManagerClass")
+      .doc("The class used to write checkpoint files atomically. This class 
must be a subclass " +
+        "of the interface CheckpointFileManager.")
+      .internal()
+      .stringConf
+
   val NDV_MAX_ERROR =
     buildConf("spark.sql.statistics.ndv.maxError")
       .internal()

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
new file mode 100644
index 0000000..606ba25
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] 
and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] 
implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file will 
be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow different 
implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the output 
stream is closed.
+   *
+   * @param path                Path to create
+   * @param overwriteIfPossible If true, then the implementations must do a 
best-effort attempt to
+   *                            overwrite the file if it already exists. It 
should not throw
+   *                            any exception if the file exists. However, if 
false, then the
+   *                            implementation must not overwrite if the file 
alraedy exists and
+   *                            must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+    list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception if 
file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the local 
file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait RenameHelperMethods { self => CheckpointFileManager
+    /** Create a file with overwrite. */
+    def createTempFile(path: Path): FSDataOutputStream
+
+    /**
+     * Rename a file.
+     *
+     * @param srcPath             Source path to rename
+     * @param dstPath             Destination path to rename to
+     * @param overwriteIfPossible If true, then the implementations must do a 
best-effort attempt to
+     *                            overwrite the file if it already exists. It 
should not throw
+     *                            any exception if the file exists. However, 
if false, then the
+     *                            implementation must not overwrite if the 
file alraedy exists and
+     *                            must throw `FileAlreadyExistsException` in 
that case.
+     */
+    def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: 
Boolean): Unit
+  }
+
+  /**
+   * An interface to add the cancel() operation to [[FSDataOutputStream]]. 
This is used
+   * mainly by `CheckpointFileManager.createAtomic` to write a file atomically.
+   *
+   * @see [[CheckpointFileManager]].
+   */
+  abstract class CancellableFSDataOutputStream(protected val underlyingStream: 
OutputStream)
+    extends FSDataOutputStream(underlyingStream, null) {
+    /** Cancel the `underlyingStream` and ensure that the output file is not 
generated. */
+    def cancel(): Unit
+  }
+
+  /**
+   * An implementation of [[CancellableFSDataOutputStream]] that writes a file 
atomically by writing
+   * to a temporary file and then renames.
+   */
+  sealed class RenameBasedFSDataOutputStream(
+      fm: CheckpointFileManager with RenameHelperMethods,
+      finalPath: Path,
+      tempPath: Path,
+      overwriteIfPossible: Boolean)
+    extends CancellableFSDataOutputStream(fm.createTempFile(tempPath)) {
+
+    def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, 
overwrite: Boolean) = {
+      this(fm, path, generateTempPath(path), overwrite)
+    }
+
+    logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
+    @volatile private var terminated = false
+
+    override def close(): Unit = synchronized {
+      try {
+        if (terminated) return
+        underlyingStream.close()
+        try {
+          fm.renameTempFile(tempPath, finalPath, overwriteIfPossible)
+        } catch {
+          case fe: FileAlreadyExistsException =>
+            logWarning(
+              s"Failed to rename temp file $tempPath to $finalPath because 
file exists", fe)
+            if (!overwriteIfPossible) throw fe
+        }
+        logInfo(s"Renamed temp file $tempPath to $finalPath")
+      } finally {
+        terminated = true
+      }
+    }
+
+    override def cancel(): Unit = synchronized {
+      try {
+        if (terminated) return
+        underlyingStream.close()
+        fm.delete(tempPath)
+      } catch {
+        case NonFatal(e) =>
+          logWarning(s"Error cancelling write to $finalPath", e)
+      } finally {
+        terminated = true
+      }
+    }
+  }
+
+
+  /** Create an instance of [[CheckpointFileManager]] based on the path and 
configuration. */
+  def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = {
+    val fileManagerClass = hadoopConf.get(
+      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
+    if (fileManagerClass != null) {
+      return Utils.classForName(fileManagerClass)
+        .getConstructor(classOf[Path], classOf[Configuration])
+        .newInstance(path, hadoopConf)
+        .asInstanceOf[CheckpointFileManager]
+    }
+    try {
+      // Try to create a manager based on `FileContext` because HDFS's 
`FileContext.rename()
+      // gives atomic renames, which is what we rely on for the default 
implementation
+      // `CheckpointFileManager.createAtomic`.
+      new FileContextBasedCheckpointFileManager(path, hadoopConf)
+    } catch {
+      case e: UnsupportedFileSystemException =>
+        logWarning(
+          "Could not use FileContext API for managing Structured Streaming 
checkpoint files at " +
+            s"$path. Using FileSystem API instead for managing log files. If 
the implementation " +
+            s"of FileSystem.rename() is not atomic, then the correctness and 
fault-tolerance of" +
+            s"your Structured Streaming is not guaranteed.")
+        new FileSystemBasedCheckpointFileManager(path, hadoopConf)
+    }
+  }
+
+  private def generateTempPath(path: Path): Path = {
+    val tc = org.apache.spark.TaskContext.get
+    val tid = if (tc != null) ".TID" + tc.taskAttemptId else ""
+    new Path(path.getParent, s".${path.getName}.${UUID.randomUUID}${tid}.tmp")
+  }
+}
+
+
+/** An implementation of [[CheckpointFileManager]] using Hadoop's 
[[FileSystem]] API. */
+class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)
+  extends CheckpointFileManager with RenameHelperMethods with Logging {
+
+  import CheckpointFileManager._
+
+  protected val fs = path.getFileSystem(hadoopConf)
+
+  override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+    fs.listStatus(path, filter)
+  }
+
+  override def mkdirs(path: Path): Unit = {
+    fs.mkdirs(path, FsPermission.getDirDefault)
+  }
+
+  override def createTempFile(path: Path): FSDataOutputStream = {
+    fs.create(path, true)
+  }
+
+  override def createAtomic(
+      path: Path,
+      overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
+  }
+
+  override def open(path: Path): FSDataInputStream = {
+    fs.open(path)
+  }
+
+  override def exists(path: Path): Boolean = {
+    try
+      return fs.getFileStatus(path) != null
+    catch {
+      case e: FileNotFoundException =>
+        return false
+    }
+  }
+
+  override def renameTempFile(srcPath: Path, dstPath: Path, 
overwriteIfPossible: Boolean): Unit = {
+    if (!overwriteIfPossible && fs.exists(dstPath)) {
+      throw new FileAlreadyExistsException(
+        s"Failed to rename $srcPath to $dstPath as destination already exists")
+    }
+
+    if (!fs.rename(srcPath, dstPath)) {
+      // FileSystem.rename() returning false is very ambiguous as it can be 
for many reasons.
+      // This tries to make a best effort attempt to return the most 
appropriate exception.
+      if (fs.exists(dstPath)) {
+        if (!overwriteIfPossible) {
+          throw new FileAlreadyExistsException(s"Failed to rename as $dstPath 
already exists")
+        }
+      } else if (!fs.exists(srcPath)) {
+        throw new FileNotFoundException(s"Failed to rename as $srcPath was not 
found")
+      } else {
+        val msg = s"Failed to rename temp file $srcPath to $dstPath as rename 
returned false"
+        logWarning(msg)
+        throw new IOException(msg)
+      }
+    }
+  }
+
+  override def delete(path: Path): Unit = {
+    try {
+      fs.delete(path, true)
+    } catch {
+      case e: FileNotFoundException =>
+        logInfo(s"Failed to delete $path as it does not exist")
+        // ignore if file has already been deleted
+    }
+  }
+
+  override def isLocal: Boolean = fs match {
+    case _: LocalFileSystem | _: RawLocalFileSystem => true
+    case _ => false
+  }
+}
+
+
+/** An implementation of [[CheckpointFileManager]] using Hadoop's 
[[FileContext]] API. */
+class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)
+  extends CheckpointFileManager with RenameHelperMethods with Logging {
+
+  import CheckpointFileManager._
+
+  private val fc = if (path.toUri.getScheme == null) {
+    FileContext.getFileContext(hadoopConf)
+  } else {
+    FileContext.getFileContext(path.toUri, hadoopConf)
+  }
+
+  override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+    fc.util.listStatus(path, filter)
+  }
+
+  override def mkdirs(path: Path): Unit = {
+    fc.mkdir(path, FsPermission.getDirDefault, true)
+  }
+
+  override def createTempFile(path: Path): FSDataOutputStream = {
+    import CreateFlag._
+    import Options._
+    fc.create(
+      path, EnumSet.of(CREATE, OVERWRITE), 
CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
+  }
+
+  override def createAtomic(
+      path: Path,
+      overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
+  }
+
+  override def open(path: Path): FSDataInputStream = {
+    fc.open(path)
+  }
+
+  override def exists(path: Path): Boolean = {
+    fc.util.exists(path)
+  }
+
+  override def renameTempFile(srcPath: Path, dstPath: Path, 
overwriteIfPossible: Boolean): Unit = {
+    import Options.Rename._
+    fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
+  }
+
+
+  override def delete(path: Path): Unit = {
+    try {
+      fc.delete(path, true)
+    } catch {
+      case e: FileNotFoundException =>
+      // ignore if file has already been deleted
+    }
+  }
+
+  override def isLocal: Boolean = fc.getDefaultFileSystem match {
+    case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + 
ChecksumFs
+    case _ => false
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 00bc215..bd0a461 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -57,10 +57,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
   require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
     "Should not create a log with type Seq, use Arrays instead - see 
SPARK-17372")
 
-  import HDFSMetadataLog._
-
   val metadataPath = new Path(path)
-  protected val fileManager = createFileManager()
+
+  protected val fileManager =
+    CheckpointFileManager.create(metadataPath, 
sparkSession.sessionState.newHadoopConf)
 
   if (!fileManager.exists(metadataPath)) {
     fileManager.mkdirs(metadataPath)
@@ -109,84 +109,31 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
     require(metadata != null, "'null' metadata cannot written to a metadata 
log")
     get(batchId).map(_ => false).getOrElse {
       // Only write metadata when the batch has not yet been written
-      writeBatch(batchId, metadata)
+      writeBatchToFile(metadata, batchIdToPath(batchId))
       true
     }
   }
 
-  private def writeTempBatch(metadata: T): Option[Path] = {
-    while (true) {
-      val tempPath = new Path(metadataPath, 
s".${UUID.randomUUID.toString}.tmp")
-      try {
-        val output = fileManager.create(tempPath)
-        try {
-          serialize(metadata, output)
-          return Some(tempPath)
-        } finally {
-          output.close()
-        }
-      } catch {
-        case e: FileAlreadyExistsException =>
-          // Failed to create "tempPath". There are two cases:
-          // 1. Someone is creating "tempPath" too.
-          // 2. This is a restart. "tempPath" has already been created but not 
moved to the final
-          // batch file (not committed).
-          //
-          // For both cases, the batch has not yet been committed. So we can 
retry it.
-          //
-          // Note: there is a potential risk here: if HDFSMetadataLog A is 
running, people can use
-          // the same metadata path to create "HDFSMetadataLog" and fail A. 
However, this is not a
-          // big problem because it requires the attacker must have the 
permission to write the
-          // metadata path. In addition, the old Streaming also have this 
issue, people can create
-          // malicious checkpoint files to crash a Streaming application too.
-      }
-    }
-    None
-  }
-
-  /**
-   * Write a batch to a temp file then rename it to the batch file.
+  /** Write a batch to a temp file then rename it to the batch file.
    *
    * There may be multiple [[HDFSMetadataLog]] using the same metadata path. 
Although it is not a
    * valid behavior, we still need to prevent it from destroying the files.
    */
-  private def writeBatch(batchId: Long, metadata: T): Unit = {
-    val tempPath = writeTempBatch(metadata).getOrElse(
-      throw new IllegalStateException(s"Unable to create temp batch file 
$batchId"))
+  private def writeBatchToFile(metadata: T, path: Path): Unit = {
+    val output = fileManager.createAtomic(path, overwriteIfPossible = false)
     try {
-      // Try to commit the batch
-      // It will fail if there is an existing file (someone has committed the 
batch)
-      logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
-      fileManager.rename(tempPath, batchIdToPath(batchId))
-
-      // SPARK-17475: HDFSMetadataLog should not leak CRC files
-      // If the underlying filesystem didn't rename the CRC file, delete it.
-      val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
-      if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
+      serialize(metadata, output)
+      output.close()
     } catch {
       case e: FileAlreadyExistsException =>
-        // If "rename" fails, it means some other "HDFSMetadataLog" has 
committed the batch.
-        // So throw an exception to tell the user this is not a valid behavior.
+        output.cancel()
+        // If next batch file already exists, then another concurrently 
running query has
+        // written it.
         throw new ConcurrentModificationException(
-          s"Multiple HDFSMetadataLog are using $path", e)
-    } finally {
-      fileManager.delete(tempPath)
-    }
-  }
-
-  /**
-   * @return the deserialized metadata in a batch file, or None if file not 
exist.
-   * @throws IllegalArgumentException when path does not point to a batch file.
-   */
-  def get(batchFile: Path): Option[T] = {
-    if (fileManager.exists(batchFile)) {
-      if (isBatchFile(batchFile)) {
-        get(pathToBatchId(batchFile))
-      } else {
-        throw new IllegalArgumentException(s"File ${batchFile} is not a batch 
file!")
-      }
-    } else {
-      None
+          s"Multiple streaming queries are concurrently using $path", e)
+      case e: Throwable =>
+        output.cancel()
+        throw e
     }
   }
 
@@ -219,7 +166,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
         (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId 
>= startId.get)
     }.sorted
 
-    verifyBatchIds(batchIds, startId, endId)
+    HDFSMetadataLog.verifyBatchIds(batchIds, startId, endId)
 
     batchIds.map(batchId => (batchId, 
get(batchId))).filter(_._2.isDefined).map {
       case (batchId, metadataOption) =>
@@ -280,19 +227,6 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
     }
   }
 
-  private def createFileManager(): FileManager = {
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-    try {
-      new FileContextManager(metadataPath, hadoopConf)
-    } catch {
-      case e: UnsupportedFileSystemException =>
-        logWarning("Could not use FileContext API for managing metadata log 
files at path " +
-          s"$metadataPath. Using FileSystem API instead for managing log 
files. The log may be " +
-          s"inconsistent under failures.")
-        new FileSystemManager(metadataPath, hadoopConf)
-    }
-  }
-
   /**
    * Parse the log version from the given `text` -- will throw exception when 
the parsed version
    * exceeds `maxSupportedVersion`, or when `text` is malformed (such as 
"xyz", "v", "v-1",
@@ -327,135 +261,6 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 
 object HDFSMetadataLog {
 
-  /** A simple trait to abstract out the file management operations needed by 
HDFSMetadataLog. */
-  trait FileManager {
-
-    /** List the files in a path that match a filter. */
-    def list(path: Path, filter: PathFilter): Array[FileStatus]
-
-    /** Make directory at the give path and all its parent directories as 
needed. */
-    def mkdirs(path: Path): Unit
-
-    /** Whether path exists */
-    def exists(path: Path): Boolean
-
-    /** Open a file for reading, or throw exception if it does not exist. */
-    def open(path: Path): FSDataInputStream
-
-    /** Create path, or throw exception if it already exists */
-    def create(path: Path): FSDataOutputStream
-
-    /**
-     * Atomically rename path, or throw exception if it cannot be done.
-     * Should throw FileNotFoundException if srcPath does not exist.
-     * Should throw FileAlreadyExistsException if destPath already exists.
-     */
-    def rename(srcPath: Path, destPath: Path): Unit
-
-    /** Recursively delete a path if it exists. Should not throw exception if 
file doesn't exist. */
-    def delete(path: Path): Unit
-  }
-
-  /**
-   * Default implementation of FileManager using newer FileContext API.
-   */
-  class FileContextManager(path: Path, hadoopConf: Configuration) extends 
FileManager {
-    private val fc = if (path.toUri.getScheme == null) {
-      FileContext.getFileContext(hadoopConf)
-    } else {
-      FileContext.getFileContext(path.toUri, hadoopConf)
-    }
-
-    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
-      fc.util.listStatus(path, filter)
-    }
-
-    override def rename(srcPath: Path, destPath: Path): Unit = {
-      fc.rename(srcPath, destPath)
-    }
-
-    override def mkdirs(path: Path): Unit = {
-      fc.mkdir(path, FsPermission.getDirDefault, true)
-    }
-
-    override def open(path: Path): FSDataInputStream = {
-      fc.open(path)
-    }
-
-    override def create(path: Path): FSDataOutputStream = {
-      fc.create(path, EnumSet.of(CreateFlag.CREATE))
-    }
-
-    override def exists(path: Path): Boolean = {
-      fc.util().exists(path)
-    }
-
-    override def delete(path: Path): Unit = {
-      try {
-        fc.delete(path, true)
-      } catch {
-        case e: FileNotFoundException =>
-        // ignore if file has already been deleted
-      }
-    }
-  }
-
-  /**
-   * Implementation of FileManager using older FileSystem API. Note that this 
implementation
-   * cannot provide atomic renaming of paths, hence can lead to consistency 
issues. This
-   * should be used only as a backup option, when FileContextManager cannot be 
used.
-   */
-  class FileSystemManager(path: Path, hadoopConf: Configuration) extends 
FileManager {
-    private val fs = path.getFileSystem(hadoopConf)
-
-    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
-      fs.listStatus(path, filter)
-    }
-
-    /**
-     * Rename a path. Note that this implementation is not atomic.
-     * @throws FileNotFoundException if source path does not exist.
-     * @throws FileAlreadyExistsException if destination path already exists.
-     * @throws IOException if renaming fails for some unknown reason.
-     */
-    override def rename(srcPath: Path, destPath: Path): Unit = {
-      if (!fs.exists(srcPath)) {
-        throw new FileNotFoundException(s"Source path does not exist: 
$srcPath")
-      }
-      if (fs.exists(destPath)) {
-        throw new FileAlreadyExistsException(s"Destination path already 
exists: $destPath")
-      }
-      if (!fs.rename(srcPath, destPath)) {
-        throw new IOException(s"Failed to rename $srcPath to $destPath")
-      }
-    }
-
-    override def mkdirs(path: Path): Unit = {
-      fs.mkdirs(path, FsPermission.getDirDefault)
-    }
-
-    override def open(path: Path): FSDataInputStream = {
-      fs.open(path)
-    }
-
-    override def create(path: Path): FSDataOutputStream = {
-      fs.create(path, false)
-    }
-
-    override def exists(path: Path): Boolean = {
-      fs.exists(path)
-    }
-
-    override def delete(path: Path): Unit = {
-      try {
-        fs.delete(path, true)
-      } catch {
-        case e: FileNotFoundException =>
-          // ignore if file has already been deleted
-      }
-    }
-  }
-
   /**
    * Verify if batchIds are continuous and between `startId` and `endId`.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3f5002a..df722b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+import java.io._
 import java.nio.channels.ClosedChannelException
 import java.util.Locale
 
@@ -27,13 +27,16 @@ import scala.util.Random
 import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs._
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.LZ4CompressionCodec
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{SizeEstimator, Utils}
 
@@ -87,10 +90,10 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     case object ABORTED extends STATE
 
     private val newVersion = version + 1
-    private val tempDeltaFile = new Path(baseDir, s"temp-${Random.nextLong}")
-    private lazy val tempDeltaFileStream = 
compressStream(fs.create(tempDeltaFile, true))
     @volatile private var state: STATE = UPDATING
-    @volatile private var finalDeltaFile: Path = null
+    private val finalDeltaFile: Path = deltaFile(newVersion)
+    private lazy val deltaFileStream = fm.createAtomic(finalDeltaFile, 
overwriteIfPossible = true)
+    private lazy val compressedStream = compressStream(deltaFileStream)
 
     override def id: StateStoreId = 
HDFSBackedStateStoreProvider.this.stateStoreId
 
@@ -103,14 +106,14 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
       val keyCopy = key.copy()
       val valueCopy = value.copy()
       mapToUpdate.put(keyCopy, valueCopy)
-      writeUpdateToDeltaFile(tempDeltaFileStream, keyCopy, valueCopy)
+      writeUpdateToDeltaFile(compressedStream, keyCopy, valueCopy)
     }
 
     override def remove(key: UnsafeRow): Unit = {
       verify(state == UPDATING, "Cannot remove after already committed or 
aborted")
       val prevValue = mapToUpdate.remove(key)
       if (prevValue != null) {
-        writeRemoveToDeltaFile(tempDeltaFileStream, key)
+        writeRemoveToDeltaFile(compressedStream, key)
       }
     }
 
@@ -126,8 +129,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
       verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
 
       try {
-        finalizeDeltaFile(tempDeltaFileStream)
-        finalDeltaFile = commitUpdates(newVersion, mapToUpdate, tempDeltaFile)
+        commitUpdates(newVersion, mapToUpdate, compressedStream)
         state = COMMITTED
         logInfo(s"Committed version $newVersion for $this to file 
$finalDeltaFile")
         newVersion
@@ -140,23 +142,14 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
 
     /** Abort all the updates made on this store. This store will not be 
usable any more. */
     override def abort(): Unit = {
-      verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
-      try {
+      // This if statement is to ensure that files are deleted only if there 
are changes to the
+      // StateStore. We have two StateStores for each task, one which is used 
only for reading, and
+      // the other used for read+write. We don't want the read-only to delete 
state files.
+      if (state == UPDATING) {
+        state = ABORTED
+        cancelDeltaFile(compressedStream, deltaFileStream)
+      } else {
         state = ABORTED
-        if (tempDeltaFileStream != null) {
-          tempDeltaFileStream.close()
-        }
-        if (tempDeltaFile != null) {
-          fs.delete(tempDeltaFile, true)
-        }
-      } catch {
-        case c: ClosedChannelException =>
-          // This can happen when underlying file output stream has been 
closed before the
-          // compression stream.
-          logDebug(s"Error aborting version $newVersion into $this", c)
-
-        case e: Exception =>
-          logWarning(s"Error aborting version $newVersion into $this", e)
       }
       logInfo(s"Aborted version $newVersion for $this")
     }
@@ -212,7 +205,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     this.valueSchema = valueSchema
     this.storeConf = storeConf
     this.hadoopConf = hadoopConf
-    fs.mkdirs(baseDir)
+    fm.mkdirs(baseDir)
   }
 
   override def stateStoreId: StateStoreId = stateStoreId_
@@ -251,31 +244,15 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
 
   private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
   private lazy val baseDir = stateStoreId.storeCheckpointLocation()
-  private lazy val fs = baseDir.getFileSystem(hadoopConf)
+  private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
   private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new 
SparkConf)
 
   private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)
 
-  /** Commit a set of updates to the store with the given new version */
-  private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: 
Path): Path = {
+  private def commitUpdates(newVersion: Long, map: MapType, output: 
DataOutputStream): Unit = {
     synchronized {
-      val finalDeltaFile = deltaFile(newVersion)
-
-      // scalastyle:off
-      // Renaming a file atop an existing one fails on HDFS
-      // 
(http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html).
-      // Hence we should either skip the rename step or delete the target 
file. Because deleting the
-      // target file will break speculation, skipping the rename step is the 
only choice. It's still
-      // semantically correct because Structured Streaming requires rerunning 
a batch should
-      // generate the same output. (SPARK-19677)
-      // scalastyle:on
-      if (fs.exists(finalDeltaFile)) {
-        fs.delete(tempDeltaFile, true)
-      } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
-        throw new IOException(s"Failed to rename $tempDeltaFile to 
$finalDeltaFile")
-      }
+      finalizeDeltaFile(output)
       loadedMaps.put(newVersion, map)
-      finalDeltaFile
     }
   }
 
@@ -365,7 +342,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     val fileToRead = deltaFile(version)
     var input: DataInputStream = null
     val sourceStream = try {
-      fs.open(fileToRead)
+      fm.open(fileToRead)
     } catch {
       case f: FileNotFoundException =>
         throw new IllegalStateException(
@@ -412,12 +389,12 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   }
 
   private def writeSnapshotFile(version: Long, map: MapType): Unit = {
-    val fileToWrite = snapshotFile(version)
-    val tempFile =
-      new Path(fileToWrite.getParent, 
s"${fileToWrite.getName}.temp-${Random.nextLong}")
+    val targetFile = snapshotFile(version)
+    var rawOutput: CancellableFSDataOutputStream = null
     var output: DataOutputStream = null
-    Utils.tryWithSafeFinally {
-      output = compressStream(fs.create(tempFile, false))
+    try {
+      rawOutput = fm.createAtomic(targetFile, overwriteIfPossible = true)
+      output = compressStream(rawOutput)
       val iter = map.entrySet().iterator()
       while(iter.hasNext) {
         val entry = iter.next()
@@ -429,16 +406,34 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
         output.write(valueBytes)
       }
       output.writeInt(-1)
-    } {
-      if (output != null) output.close()
+      output.close()
+    } catch {
+      case e: Throwable =>
+        cancelDeltaFile(compressedStream = output, rawStream = rawOutput)
+        throw e
     }
-    if (fs.exists(fileToWrite)) {
-      // Skip rename if the file is alreayd created.
-      fs.delete(tempFile, true)
-    } else if (!fs.rename(tempFile, fileToWrite)) {
-      throw new IOException(s"Failed to rename $tempFile to $fileToWrite")
+    logInfo(s"Written snapshot file for version $version of $this at 
$targetFile")
+  }
+
+  /**
+   * Try to cancel the underlying stream and safely close the compressed 
stream.
+   *
+   * @param compressedStream the compressed stream.
+   * @param rawStream the underlying stream which needs to be cancelled.
+   */
+  private def cancelDeltaFile(
+      compressedStream: DataOutputStream,
+      rawStream: CancellableFSDataOutputStream): Unit = {
+    try {
+      if (rawStream != null) rawStream.cancel()
+      IOUtils.closeQuietly(compressedStream)
+    } catch {
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+        // Closing the compressedStream causes the stream to write/flush flush 
data into the
+        // rawStream. Since the rawStream is already closed, there may be 
errors.
+        // Usually its an IOException. However, Hadoop's RawLocalFileSystem 
wraps
+        // IOException into FSError.
     }
-    logInfo(s"Written snapshot file for version $version of $this at 
$fileToWrite")
   }
 
   private def readSnapshotFile(version: Long): Option[MapType] = {
@@ -447,7 +442,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     var input: DataInputStream = null
 
     try {
-      input = decompressStream(fs.open(fileToRead))
+      input = decompressStream(fm.open(fileToRead))
       var eof = false
 
       while (!eof) {
@@ -508,7 +503,6 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
           case None =>
             // The last map is not loaded, probably some other instance is in 
charge
         }
-
       }
     } catch {
       case NonFatal(e) =>
@@ -534,7 +528,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
           }
           val filesToDelete = files.filter(_.version < 
earliestFileToRetain.version)
           filesToDelete.foreach { f =>
-            fs.delete(f.path, true)
+            fm.delete(f.path)
           }
           logInfo(s"Deleted files older than ${earliestFileToRetain.version} 
for $this: " +
             filesToDelete.mkString(", "))
@@ -576,7 +570,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   /** Fetch all the files that back the store */
   private def fetchFiles(): Seq[StoreFile] = {
     val files: Seq[FileStatus] = try {
-      fs.listStatus(baseDir)
+      fm.list(baseDir)
     } catch {
       case _: java.io.FileNotFoundException =>
         Seq.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index d1d9f95..7eb68c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -459,7 +459,6 @@ object StateStore extends Logging {
   private def coordinatorRef: Option[StateStoreCoordinatorRef] = 
loadedProviders.synchronized {
     val env = SparkEnv.get
     if (env != null) {
-      logInfo("Env is not null")
       val isDriver =
         env.executorId == SparkContext.DRIVER_IDENTIFIER ||
           env.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
@@ -467,13 +466,12 @@ object StateStore extends Logging {
       // as SparkContext + SparkEnv may have been restarted. Hence, when 
running in driver,
       // always recreate the reference.
       if (isDriver || _coordRef == null) {
-        logInfo("Getting StateStoreCoordinatorRef")
+        logDebug("Getting StateStoreCoordinatorRef")
         _coordRef = StateStoreCoordinatorRef.forExecutor(env)
       }
       logInfo(s"Retrieved reference to StateStoreCoordinator: ${_coordRef}")
       Some(_coordRef)
     } else {
-      logInfo("Env is null")
       _coordRef = null
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
new file mode 100644
index 0000000..fe59cb2
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete, exists") {
+    withTempPath { p =>
+      val basePath = new Path(p.getAbsolutePath)
+      val fm = createManager(basePath)
+      // Mkdirs
+      val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+      assert(!fm.exists(dir))
+      fm.mkdirs(dir)
+      assert(fm.exists(dir))
+      fm.mkdirs(dir)
+
+      // List
+      val acceptAllFilter = new PathFilter {
+        override def accept(path: Path): Boolean = true
+      }
+      val rejectAllFilter = new PathFilter {
+        override def accept(path: Path): Boolean = false
+      }
+      assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == 
"dir"))
+      assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+      // Create atomic without overwrite
+      var path = new Path(s"$dir/file")
+      assert(!fm.exists(path))
+      fm.createAtomic(path, overwriteIfPossible = false).cancel()
+      assert(!fm.exists(path))
+      fm.createAtomic(path, overwriteIfPossible = false).close()
+      assert(fm.exists(path))
+      quietly {
+        intercept[IOException] {
+          // should throw exception since file exists and overwrite is false
+          fm.createAtomic(path, overwriteIfPossible = false).close()
+        }
+      }
+
+      // Create atomic with overwrite if possible
+      path = new Path(s"$dir/file2")
+      assert(!fm.exists(path))
+      fm.createAtomic(path, overwriteIfPossible = true).cancel()
+      assert(!fm.exists(path))
+      fm.createAtomic(path, overwriteIfPossible = true).close()
+      assert(fm.exists(path))
+      fm.createAtomic(path, overwriteIfPossible = true).close()  // should not 
throw exception
+
+      // Open and delete
+      fm.open(path).close()
+      fm.delete(path)
+      assert(!fm.exists(path))
+      intercept[IOException] {
+        fm.open(path)
+      }
+      fm.delete(path) // should not throw exception
+    }
+  }
+
+  protected def withTempPath(f: File => Unit): Unit = {
+    val path = Utils.createTempDir()
+    path.delete()
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+}
+
+class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession 
{
+
+  test("CheckpointFileManager.create() should pick up user-specified class 
from conf") {
+    withSQLConf(
+      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+        classOf[CreateAtomicTestManager].getName) {
+      val fileManager =
+        CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf)
+      assert(fileManager.isInstanceOf[CreateAtomicTestManager])
+    }
+  }
+
+  test("CheckpointFileManager.create() should fallback from FileContext to 
FileSystem") {
+    import CheckpointFileManagerSuiteFileSystem.scheme
+    spark.conf.set(s"fs.$scheme.impl", 
classOf[CheckpointFileManagerSuiteFileSystem].getName)
+    quietly {
+      withTempDir { temp =>
+        val metadataLog = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
+        assert(metadataLog.add(0, "batch0"))
+        assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+        assert(metadataLog.get(0) === Some("batch0"))
+        assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0"))
+
+        val metadataLog2 = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
+        assert(metadataLog2.get(0) === Some("batch0"))
+        assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
+        assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0"))
+      }
+    }
+  }
+}
+
+class FileContextBasedCheckpointFileManagerSuite extends 
CheckpointFileManagerTests {
+  override def createManager(path: Path): CheckpointFileManager = {
+    new FileContextBasedCheckpointFileManager(path, new Configuration())
+  }
+}
+
+class FileSystemBasedCheckpointFileManagerSuite extends 
CheckpointFileManagerTests {
+  override def createManager(path: Path): CheckpointFileManager = {
+    new FileSystemBasedCheckpointFileManager(path, new Configuration())
+  }
+}
+
+
+/** A fake implementation to test different characteristics of 
CheckpointFileManager interface */
+class CreateAtomicTestManager(path: Path, hadoopConf: Configuration)
+  extends FileSystemBasedCheckpointFileManager(path, hadoopConf) {
+
+  import CheckpointFileManager._
+
+  override def createAtomic(path: Path, overwrite: Boolean): 
CancellableFSDataOutputStream = {
+    if (CreateAtomicTestManager.shouldFailInCreateAtomic) {
+      CreateAtomicTestManager.cancelCalledInCreateAtomic = false
+    }
+    val originalOut = super.createAtomic(path, overwrite)
+
+    new CancellableFSDataOutputStream(originalOut) {
+      override def close(): Unit = {
+        if (CreateAtomicTestManager.shouldFailInCreateAtomic) {
+          throw new IOException("Copy failed intentionally")
+        }
+        super.close()
+      }
+
+      override def cancel(): Unit = {
+        CreateAtomicTestManager.cancelCalledInCreateAtomic = true
+        originalOut.cancel()
+      }
+    }
+  }
+}
+
+object CreateAtomicTestManager {
+  @volatile var shouldFailInCreateAtomic = false
+  @volatile var cancelCalledInCreateAtomic = false
+}
+
+
+/**
+ * CheckpointFileManagerSuiteFileSystem to test fallback of the 
CheckpointFileManager
+ * from FileContext to FileSystem API.
+ */
+private class CheckpointFileManagerSuiteFileSystem extends RawLocalFileSystem {
+  import CheckpointFileManagerSuiteFileSystem.scheme
+
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+}
+
+private object CheckpointFileManagerSuiteFileSystem {
+  val scheme = 
s"CheckpointFileManagerSuiteFileSystem${math.abs(Random.nextInt)}"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index 12eaf63..ec961a9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -22,15 +22,10 @@ import java.nio.charset.StandardCharsets._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.streaming.FakeFileSystem._
 import org.apache.spark.sql.test.SharedSQLContext
 
 class CompactibleFileStreamLogSuite extends SparkFunSuite with 
SharedSQLContext {
 
-  /** To avoid caching of FS objects */
-  override protected def sparkConf =
-    super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
-
   import CompactibleFileStreamLog._
 
   /** -- testing of `object CompactibleFileStreamLog` begins -- */

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 4677769..9268306 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -17,46 +17,22 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.{File, FileNotFoundException, IOException}
-import java.net.URI
+import java.io.File
 import java.util.ConcurrentModificationException
 
 import scala.language.implicitConversions
-import scala.util.Random
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
 import org.scalatest.concurrent.Waiters._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.streaming.FakeFileSystem._
-import 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, 
FileManager, FileSystemManager}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.UninterruptibleThread
 
 class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
 
-  /** To avoid caching of FS objects */
-  override protected def sparkConf =
-    super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
-
   private implicit def toOption[A](a: A): Option[A] = Option(a)
 
-  test("FileManager: FileContextManager") {
-    withTempDir { temp =>
-      val path = new Path(temp.getAbsolutePath)
-      testFileManager(path, new FileContextManager(path, new Configuration))
-    }
-  }
-
-  test("FileManager: FileSystemManager") {
-    withTempDir { temp =>
-      val path = new Path(temp.getAbsolutePath)
-      testFileManager(path, new FileSystemManager(path, new Configuration))
-    }
-  }
-
   test("HDFSMetadataLog: basic") {
     withTempDir { temp =>
       val dir = new File(temp, "dir") // use non-existent directory to test 
whether log make the dir
@@ -82,26 +58,6 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
-    spark.conf.set(
-      s"fs.$scheme.impl",
-      classOf[FakeFileSystem].getName)
-    withTempDir { temp =>
-      val metadataLog = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
-      assert(metadataLog.add(0, "batch0"))
-      assert(metadataLog.getLatest() === Some(0 -> "batch0"))
-      assert(metadataLog.get(0) === Some("batch0"))
-      assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0"))
-
-
-      val metadataLog2 = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
-      assert(metadataLog2.get(0) === Some("batch0"))
-      assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
-      assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0"))
-
-    }
-  }
-
   test("HDFSMetadataLog: purge") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](spark, 
temp.getAbsolutePath)
@@ -121,7 +77,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
 
       // There should be exactly one file, called "2", in the metadata 
directory.
       // This check also tests for regressions of SPARK-17475
-      val allFiles = new 
File(metadataLog.metadataPath.toString).listFiles().toSeq
+      val allFiles = new File(metadataLog.metadataPath.toString).listFiles()
+        .filter(!_.getName.startsWith(".")).toSeq
       assert(allFiles.size == 1)
       assert(allFiles(0).getName() == "2")
     }
@@ -172,7 +129,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  test("HDFSMetadataLog: metadata directory collision") {
+  testQuietly("HDFSMetadataLog: metadata directory collision") {
     withTempDir { temp =>
       val waiter = new Waiter
       val maxBatchId = 100
@@ -206,60 +163,6 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  /** Basic test case for [[FileManager]] implementation. */
-  private def testFileManager(basePath: Path, fm: FileManager): Unit = {
-    // Mkdirs
-    val dir = new Path(s"$basePath/dir/subdir/subsubdir")
-    assert(!fm.exists(dir))
-    fm.mkdirs(dir)
-    assert(fm.exists(dir))
-    fm.mkdirs(dir)
-
-    // List
-    val acceptAllFilter = new PathFilter {
-      override def accept(path: Path): Boolean = true
-    }
-    val rejectAllFilter = new PathFilter {
-      override def accept(path: Path): Boolean = false
-    }
-    assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == 
"dir"))
-    assert(fm.list(basePath, rejectAllFilter).length === 0)
-
-    // Create
-    val path = new Path(s"$dir/file")
-    assert(!fm.exists(path))
-    fm.create(path).close()
-    assert(fm.exists(path))
-    intercept[IOException] {
-      fm.create(path)
-    }
-
-    // Open and delete
-    fm.open(path).close()
-    fm.delete(path)
-    assert(!fm.exists(path))
-    intercept[IOException] {
-      fm.open(path)
-    }
-    fm.delete(path) // should not throw exception
-
-    // Rename
-    val path1 = new Path(s"$dir/file1")
-    val path2 = new Path(s"$dir/file2")
-    fm.create(path1).close()
-    assert(fm.exists(path1))
-    fm.rename(path1, path2)
-    intercept[FileNotFoundException] {
-      fm.rename(path1, path2)
-    }
-    val path3 = new Path(s"$dir/file3")
-    fm.create(path3).close()
-    assert(fm.exists(path3))
-    intercept[FileAlreadyExistsException] {
-      fm.rename(path2, path3)
-    }
-  }
-
   test("verifyBatchIds") {
     import HDFSMetadataLog.verifyBatchIds
     verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), Some(3L))
@@ -277,14 +180,3 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), 
Some(5L)))
   }
 }
-
-/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to 
FileSystem API */
-class FakeFileSystem extends RawLocalFileSystem {
-  override def getUri: URI = {
-    URI.create(s"$scheme:///")
-  }
-}
-
-object FakeFileSystem {
-  val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index c843b65..73f8705 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io.{File, IOException}
 import java.net.URI
 import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -28,17 +27,17 @@ import scala.util.Random
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.apache.hadoop.fs._
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -138,7 +137,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     assert(getData(provider, 19) === Set("a" -> 19))
   }
 
-  test("SPARK-19677: Committing a delta file atop an existing one should not 
fail on HDFS") {
+  testQuietly("SPARK-19677: Committing a delta file atop an existing one 
should not fail on HDFS") {
     val conf = new Configuration()
     conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
     conf.set("fs.defaultFS", "fake:///")
@@ -344,7 +343,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
-  test("SPARK-18342: commit fails when rename fails") {
+  testQuietly("SPARK-18342: commit fails when rename fails") {
     import RenameReturnsFalseFileSystem._
     val dir = scheme + "://" + newDir()
     val conf = new Configuration()
@@ -366,7 +365,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 
     def numTempFiles: Int = {
       if (deltaFileDir.exists) {
-        deltaFileDir.listFiles.map(_.getName).count(n => n.contains("temp") && 
!n.startsWith("."))
+        deltaFileDir.listFiles.map(_.getName).count(n => n.endsWith(".tmp"))
       } else 0
     }
 
@@ -471,6 +470,43 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  test("error writing [version].delta cancels the output stream") {
+
+    val hadoopConf = new Configuration()
+    hadoopConf.set(
+      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+      classOf[CreateAtomicTestManager].getName)
+    val remoteDir = Utils.createTempDir().getAbsolutePath
+
+    val provider = newStoreProvider(
+      opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = 
hadoopConf)
+
+    // Disable failure of output stream and generate versions
+    CreateAtomicTestManager.shouldFailInCreateAtomic = false
+    for (version <- 1 to 10) {
+      val store = provider.getStore(version - 1)
+      put(store, version.toString, version) // update "1" -> 1, "2" -> 2, ...
+      store.commit()
+    }
+    val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
+
+    CreateAtomicTestManager.cancelCalledInCreateAtomic = false
+    val store = provider.getStore(10)
+    // Fail commit for next version and verify that reloading resets the files
+    CreateAtomicTestManager.shouldFailInCreateAtomic = true
+    put(store, "11", 11)
+    val e = intercept[IllegalStateException] { quietly { store.commit() } }
+    assert(e.getCause.isInstanceOf[IOException])
+    CreateAtomicTestManager.shouldFailInCreateAtomic = false
+
+    // Abort commit for next version and verify that reloading resets the files
+    CreateAtomicTestManager.cancelCalledInCreateAtomic = false
+    val store2 = provider.getStore(10)
+    put(store2, "11", 11)
+    store2.abort()
+    assert(CreateAtomicTestManager.cancelCalledInCreateAtomic)
+  }
+
   override def newStoreProvider(): HDFSBackedStateStoreProvider = {
     newStoreProvider(opId = Random.nextInt(), partition = 0)
   }
@@ -720,6 +756,14 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
    * this provider
    */
   def getData(storeProvider: ProviderClass, version: Int): Set[(String, Int)]
+
+  protected def testQuietly(name: String)(f: => Unit): Unit = {
+    test(name) {
+      quietly {
+        f
+      }
+    }
+  }
 }
 
 object StateStoreTestsHelper {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to