GitHub user tdas opened a pull request:
[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common
## What changes were proposed in this pull request?
Checkpoint files (offset log files, state store files) in Structured
Streaming must be written atomically such that no partial files are generated
(would break fault-tolerance guarantees). Currently, there are 3 locations
which try to do this individually, and in some cases, incorrectly.
1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any
implementation of `FileSystem` or `FileContext` APIs. It preferably loads
`FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
- Writing a version.delta file - This uses FileSystem APIs only to
perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem
- Writing a snapshot file - Same as above.
#### Current problems:
1. State Store behavior is incorrect -
1. Inflexible - Some file systems provide mechanisms other than
write-to-temp-file-and-rename for writing atomically and more efficiently. For
example, with S3 you can write directly to the final file and it will be made
visible only when the entire file is written and closed correctly. Any failure
can be made to terminate the writing without making any partial files visible
in S3. The current code does not abstract out this mechanism enough that it can
1. Introduce a common interface that all 3 cases above can use to write
checkpoint files atomically.
2. This interface must provide the necessary interfaces that allow
customization of the write-and-rename mechanism.
This PR does that by introducing the interface `CheckpointFileManager` and
modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface.
Similar to earlier `FileManager`, there are implementations based on
`FileSystem` and `FileContext` APIs, and the latter implementation is preferred
to make it work correctly with HDFS.
The key method this interface has is `createAtomic(path, overwrite)` which
returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All
users of this method need to either call `close()` to successfully write the
file, or `cancel()` in case of an error.
## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark SPARK-23966
Alternatively you can review and apply these changes as the patch at:
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21048
Author: Tathagata Das <tathagata.das1565@...>
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org