GitHub user tdas opened a pull request:

    [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common 
CheckpointFileManager interface

    ## What changes were proposed in this pull request?
    Checkpoint files (offset log files, state store files) in Structured 
Streaming must be written atomically such that no partial files are generated 
(would break fault-tolerance guarantees). Currently, there are 3 locations 
which try to do this individually, and in some cases, incorrectly.
    1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any 
implementation of `FileSystem` or `FileContext` APIs. It preferably loads 
`FileContext` implementation as FileContext of HDFS has atomic renames.
    1. HDFSBackedStateStore (aka in-memory state store)
      - Writing a file - This uses FileSystem APIs only to 
perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem 
      - Writing a snapshot file - Same as above.
    #### Current problems:
    1. State Store behavior is incorrect - 
    1. Inflexible - Some file systems provide mechanisms other than 
write-to-temp-file-and-rename for writing atomically and more efficiently. For 
example, with S3 you can write directly to the final file and it will be made 
visible only when the entire file is written and closed correctly. Any failure 
can be made to terminate the writing without making any partial files visible 
in S3. The current code does not abstract out this mechanism enough that it can 
be customized. 
    #### Solution:
    1. Introduce a common interface that all 3 cases above can use to write 
checkpoint files atomically. 
    2. This interface must provide the necessary interfaces that allow 
customization of the write-and-rename mechanism.
    This PR does that by introducing the interface `CheckpointFileManager` and 
modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. 
Similar to earlier `FileManager`, there are implementations based on 
`FileSystem` and `FileContext` APIs, and the latter implementation is preferred 
to make it work correctly with HDFS.
    The key method this interface has is `createAtomic(path, overwrite)` which 
returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All 
users of this method need to either call `close()` to successfully write the 
file, or `cancel()` in case of an error.
    ## How was this patch tested?
    New tests in `CheckpointFileManagerSuite` and slightly modified existing 

You can merge this pull request into a Git repository by running:

    $ git pull SPARK-23966

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21048
commit df7b339d73097b8501fe0937f770b8b2ded1b63e
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-04-11T04:21:14Z




To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to