Repository: flink Updated Branches: refs/heads/master 663c1e3f7 -> f7af3b016
[FLINK-5788] [docs] Improve documentation of FileSystem and specify the data persistence contract. This closes #3301 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7af3b01 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7af3b01 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7af3b01 Branch: refs/heads/master Commit: f7af3b01681592787db16a555b55d6b11d35f869 Parents: af81beb Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 13 14:29:03 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 14 15:32:43 2017 +0100 ---------------------------------------------------------------------- docs/internals/filesystems.md | 138 +++++++++++++++++++ .../apache/flink/core/fs/FSDataInputStream.java | 11 +- .../flink/core/fs/FSDataOutputStream.java | 81 ++++++++++- .../org/apache/flink/core/fs/FileSystem.java | 98 ++++++++++++- 4 files changed, 323 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/docs/internals/filesystems.md ---------------------------------------------------------------------- diff --git a/docs/internals/filesystems.md b/docs/internals/filesystems.md new file mode 100644 index 0000000..427251a --- /dev/null +++ b/docs/internals/filesystems.md @@ -0,0 +1,138 @@ +--- +title: "File Systems" +nav-parent_id: internals +nav-pos: 10 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* Replaced by the TOC +{:toc} + +Flink has its own file system abstraction via the `org.apache.flink.core.fs.FileSystem` class. +This abstraction provides a common set of operations and minimal guarantees across various types +of file system implementations. + +The `FileSystem`'s set of available operations is quite limited, in order to support a wide +range of file systems. For example, appending to or mutating existing files is not supported. + +File systems are identified by a *file system scheme*, such as `file://`, `hdfs://`, etc. + +# Implementations + +Flink implements the file systems directly, with the following file system schemes: + + - `file`, which represents the machine's local file system. + +Other file system types are accessed by an implementation that bridges to the suite of file systems supported by +[Apache Hadoop](https://hadoop.apache.org/). The following is an incomplete list of examples: + + - `hdfs`: Hadoop Distributed File System + - `s3`, `s3n`, and `s3a`: Amazon S3 file system + - `gcs`: Google Cloud Storage + - `maprfs`: The MapR distributed file system + - ... + +Flink loads Hadoop's file systems transparently if it finds the Hadoop File System classes in the class path and finds a valid +Hadoop configuration. By default, it looks for the Hadoop configuration in the class path. Alternatively, one can specify a +custom location via the configuration entry `fs.hdfs.hadoopconf`. + + +# Persistence Guarantees + +These `FileSystem` and its `FsDataOutputStream` instances are used to persistently store data, both for results of applications +and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined. + +## Definition of Persistence Guarantees + +Data written to an output stream is considered persistent, if two requirements are met: + + 1. **Visibility Requirement:** It must be guaranteed that all other processes, machines, + virtual machines, containers, etc. that are able to access the file see the data consistently + when given the absolute file path. This requirement is similar to the *close-to-open* + semantics defined by POSIX, but restricted to the file itself (by its absolute path). + + 2. **Durability Requirement:** The file system's specific durability/persistence requirements + must be met. These are specific to the particular file system. For example the + {@link LocalFileSystem} does not provide any durability guarantees for crashes of both + hardware and operating system, while replicated distributed file systems (like HDFS) + guarantee typically durability in the presence of up *n* concurrent node failures, + where *n* is the replication factor. + +Updates to the file's parent directory (such that the file shows up when +listing the directory contents) are not required to be complete for the data in the file stream +to be considered persistent. This relaxation is important for file systems where updates to +directory contents are only eventually consistent. + +The `FSDataOutputStream` has to guarantee data persistence for the written bytes once the call to +`FSDataOutputStream.close()` returns. + +## Examples + + - For **fault-tolerant distributed file systems**, data is considered persistent once + it has been received and acknowledged by the file system, typically by having been replicated + to a quorum of machines (*durability requirement*). In addition the absolute file path + must be visible to all other machines that will potentially access the file (*visibility requirement*). + + Whether data has hit non-volatile storage on the storage nodes depends on the specific + guarantees of the particular file system. + + The metadata updates to the file's parent directory are not required to have reached + a consistent state. It is permissible that some machines see the file when listing the parent + directory's contents while others do not, as long as access to the file by its absolute path + is possible on all nodes. + + - A **local file system** must support the POSIX *close-to-open* semantics. + Because the local file system does not have any fault tolerance guarantees, no further + requirements exist. + + The above implies specifically that data may still be in the OS cache when considered + persistent from the local file system's perspective. Crashes that cause the OS cache to loose + data are considered fatal to the local machine and are not covered by the local file system's + guarantees as defined by Flink. + + That means that computed results, checkpoints, and savepoints that are written only to + the local filesystem are not guaranteed to be recoverable from the local machine's failure, + making local file systems unsuitable for production setups. + +# Updating File Contents + +Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the +updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within +output streams such that previously written data could be changed within the same file. + +# Overwriting Files + +Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. +However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. +For example [Amazon S3](https://aws.amazon.com/documentation/s3/) guarantees only *eventual consistency* in +the visibility of the file replacement: Some machines may see the old file, some machines may see the new file. + +To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to +the same file path more than once. + +# Thread Safety + +Implementations of `FileSystem` must be thread-safe: The same instance of `FileSystem` is frequently shared across multiple threads +in Flink and must be able to concurrently create input/output streams and list file metadata. + +The `FSDataOutputStream` and `FSDataOutputStream` implementations are strictly **not thread-safe**. +Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees +about the visibility of operations across threads (many operations do not create memory fences). + http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java index 6ce1235..44dbcb1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java @@ -25,6 +25,10 @@ import java.io.InputStream; /** * Interface for a data input stream to a file on a {@link FileSystem}. + * + * <p>This extends the {@link java.io.InputStream} with methods for accessing + * the stream's {@link #getPos() current position} and + * {@link #seek(long) seeking} to a desired position. */ @Public public abstract class FSDataInputStream extends InputStream { @@ -35,15 +39,16 @@ public abstract class FSDataInputStream extends InputStream { * * @param desired * the desired offset - * @throws IOException - * thrown if an error occurred while seeking inside the input stream + * @throws IOException Thrown if an error occurred while seeking inside the input stream. */ public abstract void seek(long desired) throws IOException; /** - * Get the current position in the input stream. + * Gets the current position in the input stream. * * @return current position in the input stream + * @throws IOException Thrown if an I/O error occurred in the underlying stream + * implementation while accessing the stream's position. */ public abstract long getPos() throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java index 0318d1f..a8df5c1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java @@ -24,14 +24,93 @@ import java.io.IOException; import java.io.OutputStream; /** - * Interface for a data output stream to a file on a {@link FileSystem}. + * An output stream to a file that is created via a {@link FileSystem}. + * This class extends the base {@link java.io.OutputStream} with some additional important methods. + * + * <h2>Data Persistence Guarantees</h2> + * + * These streams are used to persistently store data, both for results of streaming applications + * and for fault tolerance and recovery. It is therefore crucial that the persistence semantics + * of these streams are well defined. + * + * <p>Please refer to the class-level docs of {@link FileSystem} for the definition of data persistence + * via Flink's FileSystem abstraction and the {@code FSDataOutputStream}. + * + * <h2>Thread Safety</h2> + * + * Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread safe. + * Instances of {@code FSDataOutputStream} should not be passed between threads, because there + * are no guarantees about the order of visibility of operations across threads. + * + * @see FileSystem + * @see FSDataInputStream */ @Public public abstract class FSDataOutputStream extends OutputStream { + /** + * Gets the position of the stream (non-negative), defined as the number of bytes + * from the beginning of the file to the current writing position. The position + * corresponds to the zero-based index of the next byte that will be written. + * + * <p>This method must report accurately report the current position of the stream. + * Various components of the high-availability and recovery logic rely on the accurate + * + * @return The current position in the stream, defined as the number of bytes + * from the beginning of the file to the current writing position. + * + * @throws IOException Thrown if an I/O error occurs while obtaining the position from + * the stream implementation. + */ public abstract long getPos() throws IOException; + /** + * Flushes the stream, writing any data currently buffered in stream implementation + * to the proper output stream. After this method has been called, the stream implementation + * must not hold onto any buffered data any more. + * + * <p>A completed flush does not mean that the data is necessarily persistent. Data + * persistence can is only assumed after calls to {@link #close()} or {@link #sync()}. + * + * <p>Implementation note: This overrides the method defined in {@link OutputStream} + * as abstract to force implementations of the {@code FSDataOutputStream} to implement + * this method directly. + * + * @throws IOException Thrown if an I/O error occurs while flushing the stream. + */ public abstract void flush() throws IOException; + /** + * Flushes the data all the way to the persistent non-volatile storage (for example disks). + * The method behaves similar to the <i>fsync</i> function, forcing all data to + * be persistent on the devices. + * + * <p> + * + * @throws IOException Thrown if an I/O error occurs + */ public abstract void sync() throws IOException; + + /** + * Closes the output stream. After this method returns, the implementation must guarantee + * that all data written to the stream is persistent/visible, as defined in the + * {@link FileSystem class-level docs}. + * + * <p>The above implies that the method must block until persistence can be guaranteed. + * For example for distributed replicated file systems, the method must block until the + * replication quorum has been reached. If the calling thread is interrupted in the + * process, it must fail with an {@code IOException} to indicate that persistence cannot + * be guaranteed. + * + * <p>If this method throws an exception, the data in the stream cannot be assumed to be + * persistent. + * + * <p>Implementation note: This overrides the method defined in {@link OutputStream} + * as abstract to force implementations of the {@code FSDataOutputStream} to implement + * this method directly. + * + * @throws IOException Thrown, if an error occurred while closing the stream or guaranteeing + * that the data is persistent. + */ + public abstract void close() throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index d8efcbc..c3828fb 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -52,12 +52,108 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Abstract base class of all file systems used by Flink. This class may be extended to implement * distributed file systems, or local file systems. The abstraction by this file system is very simple, - * and teh set of allowed operations quite limited, to support the common denominator of a wide + * and the set of available operations quite limited, to support the common denominator of a wide * range of file systems. For example, appending to or mutating existing files is not supported. * * <p>Flink implements and supports some file system types directly (for example the default * machine-local file system). Other file system types are accessed by an implementation that bridges * to the suite of file systems supported by Hadoop (such as for example HDFS). + * + * <h2>Data Persistence</h2> + * + * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data, + * both for results of streaming applications and for fault tolerance and recovery. It is therefore + * crucial that the persistence semantics of these streams are well defined. + * + * <h3>Definition of Persistence Guarantees</h3> + * + * Data written to an output stream is considered persistent, if two requirements are met: + * + * <ol> + * <li><b>Visibility Requirement:</b> It must be guaranteed that all other processes, machines, + * virtual machines, containers, etc. that are able to access the file see the data consistently + * when given the absolute file path. This requirement is similar to the <i>close-to-open</i> + * semantics defined by POSIX, but restricted to the file itself (by its absolute path).</li> + * + * <li><b>Durability Requirement:</b> The file system's specific durability/persistence requirements + * must be met. These are specific to the particular file system. For example the + * {@link LocalFileSystem} does not provide any durability guarantees for crashes of both + * hardware and operating system, while replicated distributed file systems (like HDFS) + * typically guarantee durability in the presence of at most <i>n</i> concurrent node failures, + * where <i>n</i> is the replication factor.</li> + * </ol> + * + * <p>Updates to the file's parent directory (such that the file shows up when + * listing the directory contents) are not required to be complete for the data in the file stream + * to be considered persistent. This relaxation is important for file systems where updates to + * directory contents are only eventually consistent. + * + * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes + * once the call to {@link FSDataOutputStream#close()} returns. + * + * <h3>Examples</h3> + * + * <ul> + * <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once + * it has been received and acknowledged by the file system, typically by having been replicated + * to a quorum of machines (<i>durability requirement</i>). In addition the absolute file path + * must be visible to all other machines that will potentially access the file (<i>visibility + * requirement</i>). + * + * <p>Whether data has hit non-volatile storage on the storage nodes depends on the specific + * guarantees of the particular file system. + * + * <p>The metadata updates to the file's parent directory are not required to have reached + * a consistent state. It is permissible that some machines see the file when listing the parent + * directory's contents while others do not, as long as access to the file by its absolute path + * is possible on all nodes.</li> + * + * <li>A <b>local file system</b> must support the POSIX <i>close-to-open</i> semantics. + * Because the local file system does not have any fault tolerance guarantees, no further + * requirements exist. + * + * <p>The above implies specifically that data may still be in the OS cache when considered + * persistent from the local file system's perspective. Crashes that cause the OS cache to loose + * data are considered fatal to the local machine and are not covered by the local file system's + * guarantees as defined by Flink. + * + * <p>That means that computed results, checkpoints, and savepoints that are written only to + * the local filesystem are not guaranteed to be recoverable from the local machine's failure, + * making local file systems unsuitable for production setups.</li> + * </ul> + * + * <h2>Updating File Contents</h2> + * + * Many file systems either do not support overwriting contents of existing files at all, or do + * not support consistent visibility of the updated contents in that case. For that reason, + * Flink's FileSystem does not support appending to existing files, or seeking within output streams + * so that previously written data could be overwritten. + * + * <h2>Overwriting Files</h2> + * + * Overwriting files is in general possible. A file is overwritten by deleting it and creating + * a new file. However, certain filesystems cannot make that change synchronously visible + * to all parties that have access to the file. + * For example <a href="https://aws.amazon.com/documentation/s3/">Amazon S3</a> guarantees only + * <i>eventual consistency</i> in the visibility of the file replacement: Some machines may see + * the old file, some machines may see the new file. + * + * <p>To avoid these consistency issues, the implementations of failure/recovery mechanisms in + * Flink strictly avoid writing to the same file path more than once. + * + * <h2>Thread Safety</h2> + * + * Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem + * is frequently shared across multiple threads in Flink and must be able to concurrently + * create input/output streams and list file metadata. + * + * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations are strictly + * <b>not thread-safe</b>. Instances of the streams should also not be passed between threads + * in between read or write operations, because there are no guarantees about the visibility of + * operations across threads (many operations do not create memory fences). + * + * @see FSDataInputStream + * @see FSDataOutputStream */ @Public public abstract class FileSystem {