[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-09-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r217407633
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -930,6 +930,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS =
+buildConf("spark.sql.streaming.checkpointFileManagerClass")
+  .doc("The class used to write checkpoint files atomically. This 
class must be a subclass " +
+"of the interface CheckpointFileManager.")
+  .internal()
+  .stringConf
--- End diff --

`createOptional`? cc @zsxwing @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181672758
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21048


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181488490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait RenameHelperMethods { 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181487709
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete") {
+withTempPath { p =>
+  val basePath = new Path(p.getAbsolutePath)
+  val fm = createManager(basePath)
+  // Mkdirs
+  val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+  assert(!fm.exists(dir))
+  fm.mkdirs(dir)
+  assert(fm.exists(dir))
+  fm.mkdirs(dir)
--- End diff --

This is a test. I think it is fine. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181486717
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181485806
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait RenameHelperMethods { 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181485619
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181483332
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait RenameHelperMethods { 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181480351
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait RenameHelperMethods { 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181357794
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete") {
+withTempPath { p =>
+  val basePath = new Path(p.getAbsolutePath)
+  val fm = createManager(basePath)
+  // Mkdirs
+  val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+  assert(!fm.exists(dir))
+  fm.mkdirs(dir)
+  assert(fm.exists(dir))
+  fm.mkdirs(dir)
--- End diff --

if that fm.exists() call was replaced with an fm.getFileStatus() operation, 
as suggested earlier, this assert could be come 
`assert(fm.getFileStatus(dir).isDirectory)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181357072
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181355839
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181355640
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181267497
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException, OutputStream}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait RenameHelperMethods { 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r181153863
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180940991
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
--- End diff --

whoa .. i dont know either... my intellij did some weird magic :/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180940932
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180940389
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete") {
+withTempPath { p =>
+  val basePath = new Path(p.getAbsolutePath)
+  val fm = createManager(basePath)
+  // Mkdirs
+  val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+  assert(!fm.exists(dir))
+  fm.mkdirs(dir)
+  assert(fm.exists(dir))
+  fm.mkdirs(dir)
+
+  // List
+  val acceptAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = true
+  }
+  val rejectAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = false
+  }
+  assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName 
== "dir"))
+  assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+  // Create atomic without overwrite
+  var path = new Path(s"$dir/file")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).close()
+  assert(fm.exists(path))
+  intercept[IOException] {
+// should throw exception since file exists and overwrite is false
+fm.createAtomic(path, overwriteIfPossible = false).close()
+  }
+
+  // Create atomic with overwrite if possible
+  path = new Path(s"$dir/file2")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()
+  assert(fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()  // should 
not throw exception
+
+  // Open and delete
+  fm.open(path).close()
+  fm.delete(path)
+  assert(!fm.exists(path))
+  intercept[IOException] {
+fm.open(path)
+  }
+  fm.delete(path) // should not throw exception
+}
+  }
+
+  protected def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+}
+
+class CheckpointFileManagerSuite extends SparkFunSuite with 
SharedSparkSession {
+
+  test("CheckpointFileManager.create() should pick up user-specified class 
from conf") {
+withSQLConf(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+classOf[TestCheckpointFileManager].getName) {
+  val fileManager =
+CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf)
+  assert(fileManager.isInstanceOf[TestCheckpointFileManager])
+}
+  }
+
+  test("CheckpointFileManager.create() should fallback from FileContext to 
FileSystem") {
+import FakeFileSystem.scheme
+spark.conf.set(
+  s"fs.$scheme.impl",
+  classOf[FakeFileSystem].getName)
+withTempDir { temp =>
+  val metadataLog = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
+  

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180936744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938118
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180935752
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
--- End diff --

whoa what does `FileSystem => _` do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938649
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete") {
+withTempPath { p =>
+  val basePath = new Path(p.getAbsolutePath)
+  val fm = createManager(basePath)
+  // Mkdirs
+  val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+  assert(!fm.exists(dir))
+  fm.mkdirs(dir)
+  assert(fm.exists(dir))
+  fm.mkdirs(dir)
+
+  // List
+  val acceptAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = true
+  }
+  val rejectAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = false
+  }
+  assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName 
== "dir"))
+  assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+  // Create atomic without overwrite
+  var path = new Path(s"$dir/file")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).close()
+  assert(fm.exists(path))
+  intercept[IOException] {
+// should throw exception since file exists and overwrite is false
+fm.createAtomic(path, overwriteIfPossible = false).close()
+  }
+
+  // Create atomic with overwrite if possible
+  path = new Path(s"$dir/file2")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()
+  assert(fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()  // should 
not throw exception
+
+  // Open and delete
+  fm.open(path).close()
+  fm.delete(path)
+  assert(!fm.exists(path))
+  intercept[IOException] {
+fm.open(path)
+  }
+  fm.delete(path) // should not throw exception
+}
+  }
+
+  protected def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+}
+
+class CheckpointFileManagerSuite extends SparkFunSuite with 
SharedSparkSession {
+
+  test("CheckpointFileManager.create() should pick up user-specified class 
from conf") {
+withSQLConf(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+classOf[TestCheckpointFileManager].getName) {
+  val fileManager =
+CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf)
+  assert(fileManager.isInstanceOf[TestCheckpointFileManager])
+}
+  }
+
+  test("CheckpointFileManager.create() should fallback from FileContext to 
FileSystem") {
+import FakeFileSystem.scheme
+spark.conf.set(
+  s"fs.$scheme.impl",
+  classOf[FakeFileSystem].getName)
+withTempDir { temp =>
+  val metadataLog = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
+  

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938989
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -471,6 +470,41 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  test("error writing [version].delta cancels the output stream") {
+
+val hadoopConf = new Configuration()
+hadoopConf.set(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+  classOf[TestCheckpointFileManager].getName)
+val remoteDir = Utils.createTempDir().getAbsolutePath
+
+val provider = newStoreProvider(
+  opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = 
hadoopConf)
+
+// Disable failure of output stream and generate versions
+TestCheckpointFileManager.shouldFailInCreateAtomic = false
+for (version <- 1 to 10) {
+  val store = provider.getStore(version - 1)
+  put(store, version.toString, version) // update "1" -> 1, "2" -> 2, 
...
+  store.commit()
+}
+val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
+
+val store = provider.getStore(10)
+// Fail commit for next version and verify that reloading resets the 
files
+TestCheckpointFileManager.shouldFailInCreateAtomic = true
+put(store, "11", 11)
+val e = intercept[IllegalStateException] { quietly { store.commit() } }
+assert(e.getCause.isInstanceOf[IOException], "Was waiting the 
IOException to be thrown")
+TestCheckpointFileManager.shouldFailInCreateAtomic = false
+
+// Abort commit for next version and verify that reloading resets the 
files
+val store2 = provider.getStore(10)
+put(store2, "11", 11)
+store2.abort()
+assert(TestCheckpointFileManager.cancelCalledInCreateAtomic)
--- End diff --

can you verify that it was false before the `abort`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180936292
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180937241
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/21048

[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common 
CheckpointFileManager interface

## What changes were proposed in this pull request?

Checkpoint files (offset log files, state store files) in Structured 
Streaming must be written atomically such that no partial files are generated 
(would break fault-tolerance guarantees). Currently, there are 3 locations 
which try to do this individually, and in some cases, incorrectly.

1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any 
implementation of `FileSystem` or `FileContext` APIs. It preferably loads 
`FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
  - Writing a version.delta file - This uses FileSystem APIs only to 
perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem 
implementation.
  - Writing a snapshot file - Same as above.

 Current problems:
1. State Store behavior is incorrect - 
1. Inflexible - Some file systems provide mechanisms other than 
write-to-temp-file-and-rename for writing atomically and more efficiently. For 
example, with S3 you can write directly to the final file and it will be made 
visible only when the entire file is written and closed correctly. Any failure 
can be made to terminate the writing without making any partial files visible 
in S3. The current code does not abstract out this mechanism enough that it can 
be customized. 

 Solution:

1. Introduce a common interface that all 3 cases above can use to write 
checkpoint files atomically. 
2. This interface must provide the necessary interfaces that allow 
customization of the write-and-rename mechanism.

This PR does that by introducing the interface `CheckpointFileManager` and 
modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. 
Similar to earlier `FileManager`, there are implementations based on 
`FileSystem` and `FileContext` APIs, and the latter implementation is preferred 
to make it work correctly with HDFS.

The key method this interface has is `createAtomic(path, overwrite)` which 
returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All 
users of this method need to either call `close()` to successfully write the 
file, or `cancel()` in case of an error.


## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing 
tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23966

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21048.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21048


commit df7b339d73097b8501fe0937f770b8b2ded1b63e
Author: Tathagata Das 
Date:   2018-04-11T04:21:14Z

CheckpointFileManager




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org