Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21048#discussion_r180937241
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
    @@ -0,0 +1,344 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.io.{FileSystem => _, _}
    +import java.util.{EnumSet, UUID}
    +
    +import scala.util.control.NonFatal
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs._
    +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
    +import org.apache.hadoop.fs.permission.FsPermission
    +
    +import org.apache.spark.internal.Logging
    +import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
    + * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
    + * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
    + * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
    + * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
    + * without overwrite.
    + *
    + * This higher-level interface above the Hadoop FileSystem is necessary 
because
    + * different implementation of FileSystem/FileContext may have different 
combination of operations
    + * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
    + * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
    + * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
    + */
    +trait CheckpointFileManager {
    +
    +  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
    +
    +  /**
    +   * Create a file and make its contents available atomically after the 
output stream is closed.
    +   *
    +   * @param path                Path to create
    +   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
    +   *                            overwrite the file if it already exists. 
It should not throw
    +   *                            any exception if the file exists. However, 
if false, then the
    +   *                            implementation must not overwrite if the 
file alraedy exists and
    +   *                            must throw `FileAlreadyExistsException` in 
that case.
    +   */
    +  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
    +
    +  /** Open a file for reading, or throw exception if it does not exist. */
    +  def open(path: Path): FSDataInputStream
    +
    +  /** List the files in a path that match a filter. */
    +  def list(path: Path, filter: PathFilter): Array[FileStatus]
    +
    +  /** List all the files in a path. */
    +  def list(path: Path): Array[FileStatus] = {
    +    list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
    +  }
    +
    +  /** Make directory at the give path and all its parent directories as 
needed. */
    +  def mkdirs(path: Path): Unit
    +
    +  /** Whether path exists */
    +  def exists(path: Path): Boolean
    +
    +  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
    +  def delete(path: Path): Unit
    +
    +  /** Is the default file system this implementation is operating on the 
local file system. */
    +  def isLocal: Boolean
    +}
    +
    +object CheckpointFileManager extends Logging {
    +
    +  /**
    +   * Additional methods in CheckpointFileManager implementations that 
allows
    +   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
    +   */
    +  sealed trait RenameHelperMethods { self => CheckpointFileManager
    +    /** Create a file with overwrite. */
    +    def create(path: Path): FSDataOutputStream
    +
    +    /**
    +     * Rename a file.
    +     *
    +     * @param srcPath             Source path to rename
    +     * @param dstPath             Destination path to rename to
    +     * @param overwriteIfPossible If true, then the implementations must 
do a best-effort attempt to
    +     *                            overwrite the file if it already exists. 
It should not throw
    +     *                            any exception if the file exists. 
However, if false, then the
    +     *                            implementation must not overwrite if the 
file alraedy exists and
    +     *                            must throw `FileAlreadyExistsException` 
in that case.
    +     */
    +    def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: 
Boolean): Unit
    +  }
    +
    +  /**
    +   * An interface to add the cancel() operation to [[FSDataOutputStream]]. 
This is used
    +   * mainly by `CheckpointFileManager.createAtomic` to write a file 
atomically.
    +   *
    +   * @see [[CheckpointFileManager]].
    +   */
    +  abstract class CancellableFSDataOutputStream(protected val 
underlyingStream: OutputStream)
    +    extends FSDataOutputStream(underlyingStream, null) {
    +    /** Cancel the `underlyingStream` and ensure that the output file is 
not generated. */
    +    def cancel(): Unit
    +  }
    +
    +  /**
    +   * An implementation of [[CancellableFSDataOutputStream]] that writes a 
file atomically by writing
    +   * to a temporary file and then renames.
    +   */
    +  sealed class RenameBasedFSDataOutputStream(
    +      fm: CheckpointFileManager with RenameHelperMethods,
    +      finalPath: Path,
    +      tempPath: Path,
    +      overwriteIfPossible: Boolean)
    +    extends CancellableFSDataOutputStream(fm.create(tempPath)) {
    +
    +    def this(fm: CheckpointFileManager with RenameHelperMethods, path: 
Path, overwrite: Boolean) = {
    +      this(fm, path, generateTempPath(path), overwrite)
    +    }
    +
    +    logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
    +    @volatile private var terminated = false
    +
    +    override def close(): Unit = synchronized {
    +      try {
    +        if (terminated) return
    +        super.close()
    +        fm.rename(tempPath, finalPath, overwriteIfPossible)
    +        logInfo(s"Renamed temp file $tempPath to $finalPath")
    +      } finally {
    +        terminated = true
    +      }
    +    }
    +
    +    override def cancel(): Unit = synchronized {
    +      try {
    +        if (terminated) return
    +        underlyingStream.close()
    +        fm.delete(tempPath)
    +      } catch {
    +        case NonFatal(e) =>
    +          logWarning(s"Error cancelling write to $finalPath", e)
    +      } finally {
    +        terminated = true
    +      }
    +    }
    +  }
    +
    +
    +  /** Create an instance of [[CheckpointFileManager]] based on the path 
and configuration. */
    +  def create(path: Path, hadoopConf: Configuration): CheckpointFileManager 
= {
    +    val fileManagerClass = hadoopConf.get(
    +      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
    +    if (fileManagerClass != null) {
    +      return Utils.classForName(fileManagerClass)
    +        .getConstructor(classOf[Path], classOf[Configuration])
    +        .newInstance(path, hadoopConf)
    +        .asInstanceOf[CheckpointFileManager]
    +    }
    +    try {
    +      // Try to create a manager based on `FileContext` because HDFS's 
`FileContext.rename()
    +      // gives atomic renames, which is what we rely on for the default 
implementation
    +      // `CheckpointFileManager.createAtomic`.
    +      new FileContextBasedCheckpointFileManager(path, hadoopConf)
    +    } catch {
    +      case e: UnsupportedFileSystemException =>
    +        logWarning(
    +          "Could not use FileContext API for managing Structured Streaming 
checkpoint files at " +
    +            s"$path. Using FileSystem API instead for managing log files. 
If the implementation " +
    +            s"of FileSystem.rename() is not atomic, then the correctness 
and fault-tolerance of" +
    +            s"your Structured Streaming is not guaranteed.")
    +        new FileSystemBasedCheckpointFileManager(path, hadoopConf)
    +    }
    +  }
    +
    +  private def generateTempPath(path: Path): Path = {
    +    val tc = org.apache.spark.TaskContext.get
    +    val tid = if (tc != null) ".TID" + tc.taskAttemptId else ""
    +    new Path(path.getParent, 
s".${path.getName}.${UUID.randomUUID}${tid}.tmp")
    +  }
    +}
    +
    +
    +/** An implementation of [[CheckpointFileManager]] using Hadoop's 
[[FileSystem]] API. */
    +class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)
    +  extends CheckpointFileManager with RenameHelperMethods with Logging {
    +
    +  import CheckpointFileManager._
    +
    +  protected val fs = path.getFileSystem(hadoopConf)
    +
    +  fs.setVerifyChecksum(false)
    +  fs.setWriteChecksum(false)
    +
    +  override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
    +    fs.listStatus(path, filter)
    +  }
    +
    +  override def mkdirs(path: Path): Unit = {
    +    fs.mkdirs(path, FsPermission.getDirDefault)
    +  }
    +
    +  override def create(path: Path): FSDataOutputStream = {
    +    fs.create(path, true)
    +  }
    +
    +  override def createAtomic(
    +      path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream = {
    +    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
    +  }
    +
    +  override def open(path: Path): FSDataInputStream = {
    +    fs.open(path)
    +  }
    +
    +  override def exists(path: Path): Boolean = {
    +    fs.exists(path)
    +  }
    +
    +  override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: 
Boolean): Unit = {
    +    if (!overwriteIfPossible && fs.exists(dstPath)) {
    +      throw new FileAlreadyExistsException(
    +        s"Failed to rename $srcPath to $dstPath as destination already 
exists")
    +    }
    +
    +    try {
    +      if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) {
    +        if (fs.exists(dstPath)) {
    +          // Some implementations of FileSystem may not throw 
FileAlreadyExistsException but
    +          // only return false if file already exists. Explicitly throw 
the error.
    +          // Note that this is definitely not atomic, so this is only a 
best-effort attempt
    +          // to throw the most appropriate exception when rename returned 
false.
    +          throw new FileAlreadyExistsException(s"$dstPath already exists")
    +        } else {
    +          val msg = s"Failed to rename temp file $srcPath to $dstPath as 
rename returned false"
    +          logWarning(msg)
    +          throw new IOException(msg)
    +        }
    +      }
    +    } catch {
    +      case fe: FileAlreadyExistsException =>
    +        // Some implementation of FileSystem can directly throw 
FileAlreadyExistsException if file
    +        // already exists. Ignore the error if overwriteIfPossible = true 
as it is expected to be
    +        // best effort.
    +        logWarning(s"Failed to rename temp file $srcPath to $dstPath 
because file exists", fe)
    +        if (!overwriteIfPossible) throw fe
    +    }
    +  }
    +
    +  override def delete(path: Path): Unit = {
    +    try {
    +      fs.delete(path, true)
    +    } catch {
    +      case e: FileNotFoundException =>
    +        logInfo(s"Failed to delete $path as it does not exist")
    +        // ignore if file has already been deleted
    +    }
    +  }
    +
    +  override def isLocal: Boolean = fs match {
    +    case _: LocalFileSystem | _: RawLocalFileSystem => true
    +    case _ => false
    +  }
    +}
    +
    +
    +/** An implementation of [[CheckpointFileManager]] using Hadoop's 
[[FileContext]] API. */
    +class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)
    +  extends CheckpointFileManager with RenameHelperMethods with Logging {
    +
    +  import CheckpointFileManager._
    +
    +  private val fc = if (path.toUri.getScheme == null) {
    +    FileContext.getFileContext(hadoopConf)
    +  } else {
    +    FileContext.getFileContext(path.toUri, hadoopConf)
    +  }
    +
    +  override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
    +    fc.util.listStatus(path, filter)
    +  }
    +
    +  override def mkdirs(path: Path): Unit = {
    +    fc.mkdir(path, FsPermission.getDirDefault, true)
    +  }
    +
    +  override def create(path: Path): FSDataOutputStream = {
    +    import CreateFlag._
    +    fc.create(path, EnumSet.of(CREATE, OVERWRITE))
    +  }
    +
    +  override def createAtomic(
    +      path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream = {
    --- End diff --
    
    ditto on two lines


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to