Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21048#discussion_r180936292
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
    @@ -0,0 +1,344 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.io.{FileSystem => _, _}
    +import java.util.{EnumSet, UUID}
    +
    +import scala.util.control.NonFatal
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs._
    +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
    +import org.apache.hadoop.fs.permission.FsPermission
    +
    +import org.apache.spark.internal.Logging
    +import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
    + * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
    + * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
    + * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
    + * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
    + * without overwrite.
    + *
    + * This higher-level interface above the Hadoop FileSystem is necessary 
because
    + * different implementation of FileSystem/FileContext may have different 
combination of operations
    + * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
    + * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
    + * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
    + */
    +trait CheckpointFileManager {
    +
    +  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
    +
    +  /**
    +   * Create a file and make its contents available atomically after the 
output stream is closed.
    +   *
    +   * @param path                Path to create
    +   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
    +   *                            overwrite the file if it already exists. 
It should not throw
    +   *                            any exception if the file exists. However, 
if false, then the
    +   *                            implementation must not overwrite if the 
file alraedy exists and
    +   *                            must throw `FileAlreadyExistsException` in 
that case.
    +   */
    +  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
    +
    +  /** Open a file for reading, or throw exception if it does not exist. */
    +  def open(path: Path): FSDataInputStream
    +
    +  /** List the files in a path that match a filter. */
    +  def list(path: Path, filter: PathFilter): Array[FileStatus]
    +
    +  /** List all the files in a path. */
    +  def list(path: Path): Array[FileStatus] = {
    +    list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
    +  }
    +
    +  /** Make directory at the give path and all its parent directories as 
needed. */
    +  def mkdirs(path: Path): Unit
    +
    +  /** Whether path exists */
    +  def exists(path: Path): Boolean
    +
    +  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
    +  def delete(path: Path): Unit
    +
    +  /** Is the default file system this implementation is operating on the 
local file system. */
    +  def isLocal: Boolean
    +}
    +
    +object CheckpointFileManager extends Logging {
    +
    +  /**
    +   * Additional methods in CheckpointFileManager implementations that 
allows
    +   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
    +   */
    +  sealed trait RenameHelperMethods { self => CheckpointFileManager
    +    /** Create a file with overwrite. */
    +    def create(path: Path): FSDataOutputStream
    +
    +    /**
    +     * Rename a file.
    +     *
    +     * @param srcPath             Source path to rename
    +     * @param dstPath             Destination path to rename to
    +     * @param overwriteIfPossible If true, then the implementations must 
do a best-effort attempt to
    +     *                            overwrite the file if it already exists. 
It should not throw
    +     *                            any exception if the file exists. 
However, if false, then the
    +     *                            implementation must not overwrite if the 
file alraedy exists and
    +     *                            must throw `FileAlreadyExistsException` 
in that case.
    +     */
    +    def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: 
Boolean): Unit
    +  }
    +
    +  /**
    +   * An interface to add the cancel() operation to [[FSDataOutputStream]]. 
This is used
    +   * mainly by `CheckpointFileManager.createAtomic` to write a file 
atomically.
    +   *
    +   * @see [[CheckpointFileManager]].
    +   */
    +  abstract class CancellableFSDataOutputStream(protected val 
underlyingStream: OutputStream)
    +    extends FSDataOutputStream(underlyingStream, null) {
    +    /** Cancel the `underlyingStream` and ensure that the output file is 
not generated. */
    +    def cancel(): Unit
    +  }
    +
    +  /**
    +   * An implementation of [[CancellableFSDataOutputStream]] that writes a 
file atomically by writing
    +   * to a temporary file and then renames.
    +   */
    +  sealed class RenameBasedFSDataOutputStream(
    +      fm: CheckpointFileManager with RenameHelperMethods,
    +      finalPath: Path,
    +      tempPath: Path,
    +      overwriteIfPossible: Boolean)
    +    extends CancellableFSDataOutputStream(fm.create(tempPath)) {
    +
    +    def this(fm: CheckpointFileManager with RenameHelperMethods, path: 
Path, overwrite: Boolean) = {
    +      this(fm, path, generateTempPath(path), overwrite)
    +    }
    +
    +    logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
    --- End diff --
    
    nit: `logDebug`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to