Repository: flink
Updated Branches:
  refs/heads/master 663c1e3f7 -> f7af3b016


[FLINK-5788] [docs] Improve documentation of FileSystem and specify the data 
persistence contract.

This closes #3301


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7af3b01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7af3b01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7af3b01

Branch: refs/heads/master
Commit: f7af3b01681592787db16a555b55d6b11d35f869
Parents: af81beb
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 13 14:29:03 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 docs/internals/filesystems.md                   | 138 +++++++++++++++++++
 .../apache/flink/core/fs/FSDataInputStream.java |  11 +-
 .../flink/core/fs/FSDataOutputStream.java       |  81 ++++++++++-
 .../org/apache/flink/core/fs/FileSystem.java    |  98 ++++++++++++-
 4 files changed, 323 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/docs/internals/filesystems.md
----------------------------------------------------------------------
diff --git a/docs/internals/filesystems.md b/docs/internals/filesystems.md
new file mode 100644
index 0000000..427251a
--- /dev/null
+++ b/docs/internals/filesystems.md
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal guarantees 
across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order to 
support a wide
+range of file systems. For example, appending to or mutating existing files is 
not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machine's local file system.
+
+Other file system types are accessed by an implementation that bridges to the 
suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an incomplete 
list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop File 
System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in the 
class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two requirements 
are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+     virtual machines, containers, etc. that are able to access the file see 
the data consistently
+     when given the absolute file path. This requirement is similar to the 
*close-to-open*
+     semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+     must be met. These are specific to the particular file system. For 
example the
+     {@link LocalFileSystem} does not provide any durability guarantees for 
crashes of both
+     hardware and operating system, while replicated distributed file systems 
(like HDFS)
+     guarantee typically durability in the presence of up *n* concurrent node 
failures,
+     where *n* is the replication factor.
+
+Updates to the file's parent directory (such that the file shows up when
+listing the directory contents) are not required to be complete for the data 
in the file stream
+to be considered persistent. This relaxation is important for file systems 
where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written 
bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered 
persistent once 
+    it has been received and acknowledged by the file system, typically by 
having been replicated
+    to a quorum of machines (*durability requirement*). In addition the 
absolute file path
+    must be visible to all other machines that will potentially access the 
file (*visibility requirement*).
+
+    Whether data has hit non-volatile storage on the storage nodes depends on 
the specific
+    guarantees of the particular file system.
+
+    The metadata updates to the file's parent directory are not required to 
have reached
+    a consistent state. It is permissible that some machines see the file when 
listing the parent
+    directory's contents while others do not, as long as access to the file by 
its absolute path
+    is possible on all nodes.
+
+  - A **local file system** must support the POSIX *close-to-open* semantics.
+    Because the local file system does not have any fault tolerance 
guarantees, no further
+    requirements exist.
+ 
+    The above implies specifically that data may still be in the OS cache when 
considered
+    persistent from the local file system's perspective. Crashes that cause 
the OS cache to loose
+    data are considered fatal to the local machine and are not covered by the 
local file system's
+    guarantees as defined by Flink.
+
+    That means that computed results, checkpoints, and savepoints that are 
written only to
+    the local filesystem are not guaranteed to be recoverable from the local 
machine's failure,
+    making local file systems unsuitable for production setups.
+
+# Updating File Contents
+
+Many file systems either do not support overwriting contents of existing files 
at all, or do not support consistent visibility of the
+updated contents in that case. For that reason, Flink's FileSystem does not 
support appending to existing files, or seeking within
+output streams such that previously written data could be changed within the 
same file.
+
+# Overwriting Files
+
+Overwriting files is in general possible. A file is overwritten by deleting it 
and creating a new file.
+However, certain filesystems cannot make that change synchronously visible to 
all parties that have access to the file.
+For example [Amazon S3](https://aws.amazon.com/documentation/s3/) guarantees 
only *eventual consistency* in
+the visibility of the file replacement: Some machines may see the old file, 
some machines may see the new file.
+
+To avoid these consistency issues, the implementations of failure/recovery 
mechanisms in Flink strictly avoid writing to
+the same file path more than once.
+
+# Thread Safety
+
+Implementations of `FileSystem` must be thread-safe: The same instance of 
`FileSystem` is frequently shared across multiple threads
+in Flink and must be able to concurrently create input/output streams and list 
file metadata.
+
+The `FSDataOutputStream` and `FSDataOutputStream` implementations are strictly 
**not thread-safe**.
+Instances of the streams should also not be passed between threads in between 
read or write operations, because there are no guarantees
+about the visibility of operations across threads (many operations do not 
create memory fences).
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
index 6ce1235..44dbcb1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
@@ -25,6 +25,10 @@ import java.io.InputStream;
 
 /**
  * Interface for a data input stream to a file on a {@link FileSystem}.
+ * 
+ * <p>This extends the {@link java.io.InputStream} with methods for accessing
+ * the stream's {@link #getPos() current position} and
+ * {@link #seek(long) seeking} to a desired position.
  */
 @Public
 public abstract class FSDataInputStream extends InputStream {
@@ -35,15 +39,16 @@ public abstract class FSDataInputStream extends InputStream 
{
         * 
         * @param desired
         *        the desired offset
-        * @throws IOException
-        *         thrown if an error occurred while seeking inside the input 
stream
+        * @throws IOException Thrown if an error occurred while seeking inside 
the input stream.
         */
        public abstract void seek(long desired) throws IOException;
 
        /**
-        * Get the current position in the input stream.
+        * Gets the current position in the input stream.
         *
         * @return current position in the input stream
+        * @throws IOException Thrown if an I/O error occurred in the 
underlying stream 
+        *                     implementation while accessing the stream's 
position.
         */
        public abstract long getPos() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
index 0318d1f..a8df5c1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
@@ -24,14 +24,93 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 /**
- * Interface for a data output stream to a file on a {@link FileSystem}.
+ * An output stream to a file that is created via a {@link FileSystem}.
+ * This class extends the base {@link java.io.OutputStream} with some 
additional important methods.
+ * 
+ * <h2>Data Persistence Guarantees</h2>
+ * 
+ * These streams are used to persistently store data, both for results of 
streaming applications
+ * and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics
+ * of these streams are well defined.
+ * 
+ * <p>Please refer to the class-level docs of {@link FileSystem} for the 
definition of data persistence
+ * via Flink's FileSystem abstraction and the {@code FSDataOutputStream}.
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * Implementations of the {@code FSDataOutputStream} are generally not assumed 
to be thread safe.
+ * Instances of {@code FSDataOutputStream} should not be passed between 
threads, because there
+ * are no guarantees about the order of visibility of operations across 
threads.
+ * 
+ * @see FileSystem
+ * @see FSDataInputStream
  */
 @Public
 public abstract class FSDataOutputStream extends OutputStream {
 
+       /**
+        * Gets the position of the stream (non-negative), defined as the 
number of bytes
+        * from the beginning of the file to the current writing position. The 
position
+        * corresponds to the zero-based index of the next byte that will be 
written.
+        * 
+        * <p>This method must report accurately report the current position of 
the stream.
+        * Various components of the high-availability and recovery logic rely 
on the accurate
+        * 
+        * @return The current position in the stream, defined as the number of 
bytes
+        *         from the beginning of the file to the current writing 
position.
+        * 
+        * @throws IOException Thrown if an I/O error occurs while obtaining 
the position from
+        *                     the stream implementation.
+        */
        public abstract long getPos() throws IOException;
 
+       /**
+        * Flushes the stream, writing any data currently buffered in stream 
implementation
+        * to the proper output stream. After this method has been called, the 
stream implementation
+        * must not hold onto any buffered data any more.
+        * 
+        * <p>A completed flush does not mean that the data is necessarily 
persistent. Data
+        * persistence can is only assumed after calls to {@link #close()} or 
{@link #sync()}.
+        * 
+        * <p>Implementation note: This overrides the method defined in {@link 
OutputStream}
+        * as abstract to force implementations of the {@code 
FSDataOutputStream} to implement
+        * this method directly.
+        * 
+        * @throws IOException Thrown if an I/O error occurs while flushing the 
stream.
+        */
        public abstract void flush() throws IOException;
 
+       /**
+        * Flushes the data all the way to the persistent non-volatile storage 
(for example disks).
+        * The method behaves similar to the <i>fsync</i> function, forcing all 
data to
+        * be persistent on the devices.
+        * 
+        * <p>
+        * 
+        * @throws IOException Thrown if an I/O error occurs
+        */
        public abstract void sync() throws IOException;
+
+       /**
+        * Closes the output stream. After this method returns, the 
implementation must guarantee
+        * that all data written to the stream is persistent/visible, as 
defined in the
+        * {@link FileSystem class-level docs}.
+        * 
+        * <p>The above implies that the method must block until persistence 
can be guaranteed.
+        * For example for distributed replicated file systems, the method must 
block until the
+        * replication quorum has been reached. If the calling thread is 
interrupted in the
+        * process, it must fail with an {@code IOException} to indicate that 
persistence cannot
+        * be guaranteed.
+        * 
+        * <p>If this method throws an exception, the data in the stream cannot 
be assumed to be
+        * persistent.
+        * 
+        * <p>Implementation note: This overrides the method defined in {@link 
OutputStream}
+        * as abstract to force implementations of the {@code 
FSDataOutputStream} to implement
+        * this method directly.
+        *         
+        * @throws IOException Thrown, if an error occurred while closing the 
stream or guaranteeing
+        *                     that the data is persistent.
+        */
+       public abstract void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d8efcbc..c3828fb 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -52,12 +52,108 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Abstract base class of all file systems used by Flink. This class may be 
extended to implement
  * distributed file systems, or local file systems. The abstraction by this 
file system is very simple,
- * and teh set of allowed operations quite limited, to support the common 
denominator of a wide
+ * and the set of available operations quite limited, to support the common 
denominator of a wide
  * range of file systems. For example, appending to or mutating existing files 
is not supported.
  * 
  * <p>Flink implements and supports some file system types directly (for 
example the default
  * machine-local file system). Other file system types are accessed by an 
implementation that bridges
  * to the suite of file systems supported by Hadoop (such as for example HDFS).
+ * 
+ * <h2>Data Persistence</h2>
+ * 
+ * The FileSystem's {@link FSDataOutputStream output streams} are used to 
persistently store data,
+ * both for results of streaming applications and for fault tolerance and 
recovery. It is therefore
+ * crucial that the persistence semantics of these streams are well defined.
+ * 
+ * <h3>Definition of Persistence Guarantees</h3>
+ * 
+ * Data written to an output stream is considered persistent, if two 
requirements are met:
+ * 
+ * <ol>
+ *     <li><b>Visibility Requirement:</b> It must be guaranteed that all other 
processes, machines,
+ *     virtual machines, containers, etc. that are able to access the file see 
the data consistently
+ *     when given the absolute file path. This requirement is similar to the 
<i>close-to-open</i>
+ *     semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).</li>
+ * 
+ *     <li><b>Durability Requirement:</b> The file system's specific 
durability/persistence requirements
+ *     must be met. These are specific to the particular file system. For 
example the
+ *     {@link LocalFileSystem} does not provide any durability guarantees for 
crashes of both
+ *     hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ *     typically guarantee durability in the presence of at most <i>n</i> 
concurrent node failures,
+ *     where <i>n</i> is the replication factor.</li>
+ * </ol>
+ *
+ * <p>Updates to the file's parent directory (such that the file shows up when
+ * listing the directory contents) are not required to be complete for the 
data in the file stream
+ * to be considered persistent. This relaxation is important for file systems 
where updates to
+ * directory contents are only eventually consistent.
+ * 
+ * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the 
written bytes
+ * once the call to {@link FSDataOutputStream#close()} returns.
+ *
+ * <h3>Examples</h3>
+ *
+ * <ul>
+ *     <li>For <b>fault-tolerant distributed file systems</b>, data is 
considered persistent once 
+ *     it has been received and acknowledged by the file system, typically by 
having been replicated
+ *     to a quorum of machines (<i>durability requirement</i>). In addition 
the absolute file path
+ *     must be visible to all other machines that will potentially access the 
file (<i>visibility
+ *     requirement</i>).
+ *
+ *     <p>Whether data has hit non-volatile storage on the storage nodes 
depends on the specific
+ *     guarantees of the particular file system.
+ *
+ *     <p>The metadata updates to the file's parent directory are not required 
to have reached
+ *     a consistent state. It is permissible that some machines see the file 
when listing the parent
+ *     directory's contents while others do not, as long as access to the file 
by its absolute path
+ *     is possible on all nodes.</li>
+ *
+ *     <li>A <b>local file system</b> must support the POSIX 
<i>close-to-open</i> semantics.
+ *     Because the local file system does not have any fault tolerance 
guarantees, no further
+ *     requirements exist.
+ * 
+ *     <p>The above implies specifically that data may still be in the OS 
cache when considered
+ *     persistent from the local file system's perspective. Crashes that cause 
the OS cache to loose
+ *     data are considered fatal to the local machine and are not covered by 
the local file system's
+ *     guarantees as defined by Flink.
+ * 
+ *     <p>That means that computed results, checkpoints, and savepoints that 
are written only to
+ *     the local filesystem are not guaranteed to be recoverable from the 
local machine's failure,
+ *     making local file systems unsuitable for production setups.</li>
+ * </ul>
+ *
+ * <h2>Updating File Contents</h2>
+ *
+ * Many file systems either do not support overwriting contents of existing 
files at all, or do
+ * not support consistent visibility of the updated contents in that case. For 
that reason,
+ * Flink's FileSystem does not support appending to existing files, or seeking 
within output streams
+ * so that previously written data could be overwritten.
+ *
+ * <h2>Overwriting Files</h2>
+ *
+ * Overwriting files is in general possible. A file is overwritten by deleting 
it and creating
+ * a new file. However, certain filesystems cannot make that change 
synchronously visible
+ * to all parties that have access to the file.
+ * For example <a href="https://aws.amazon.com/documentation/s3/";>Amazon 
S3</a> guarantees only
+ * <i>eventual consistency</i> in the visibility of the file replacement: Some 
machines may see
+ * the old file, some machines may see the new file.
+ *
+ * <p>To avoid these consistency issues, the implementations of 
failure/recovery mechanisms in
+ * Flink strictly avoid writing to the same file path more than once.
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * Implementations of {@code FileSystem} must be thread-safe: The same 
instance of FileSystem
+ * is frequently shared across multiple threads in Flink and must be able to 
concurrently
+ * create input/output streams and list file metadata.
+ * 
+ * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} 
implementations are strictly
+ * <b>not thread-safe</b>. Instances of the streams should also not be passed 
between threads
+ * in between read or write operations, because there are no guarantees about 
the visibility of
+ * operations across threads (many operations do not create memory fences).
+ * 
+ * @see FSDataInputStream
+ * @see FSDataOutputStream
  */
 @Public
 public abstract class FileSystem {

Reply via email to