Thanks Yun for highlighting this, it's very helpful! I'll give it a go with that in mind.
We have already begun using checkpoints for recovery. Having these improvements would still be immensely helpful to reduce downtime for savepoint recovery. On Mon, May 18, 2020 at 3:14 PM Yun Tang <myas...@live.com> wrote: > Hi Joey > > Previously, I also looked at the mechanism to create on-disk SSTables as I > planed to use RocksDB's benchmark to mock scenario in Flink. However, I > found the main challenge is how to ensure the keys are inserted in a > strictly increasing order. The key order in java could differ from the > bytes order in RocksDB. In your case, I think it could be much easier as > RocksFullSnapshotStrategy write data per columnfamily per key group which > should be in a strictly increasing order [1]. > > FLINK-17288 <https://issues.apache.org/jira/browse/FLINK-17288> could > mitigate the performance and your solution could help improve the > performance much better (and could integrate with state-processor-api > story). > > On the other hand, for out-of-box to use in production for your scenario, > how about using checkpoint to recover, as it also supports rescale and > normal recover. > > [1] > https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308 > > > Best > Yun Tang > ------------------------------ > *From:* Joey Pereira <j...@stripe.com> > *Sent:* Tuesday, May 19, 2020 2:27 > *To:* user@flink.apache.org <user@flink.apache.org> > *Cc:* Mike Mintz <mikemi...@stripe.com>; Shahid Chohan <cho...@stripe.com>; > Aaron Levin <aaronle...@stripe.com> > *Subject:* RocksDB savepoint recovery performance improvements > > Hey, > > While running a Flink application with a large-state, savepoint recovery > has been a painful part of operating the application because recovery time > can be several hours. During some profiling that chohan (cc'd) had done, a > red flag stood out — savepoint recovery consisted mostly of RocksDB Get and > Put operations. > > When Flink is bootstrapping state for RocksDB instances this is not what I > would have expected, as RocksDB supports direct ingestion of the on-disk > format (SSTables): > https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. > This > was also recently reported on Jira: > https://issues.apache.org/jira/browse/FLINK-17288. > > From what I understood of the current implementation: > > * The snapshot restoration pathways, RocksDBFullRestoreOperation and > RocksDBIncrementalRestoreOperation, > use RocksDBWriteBatchWrapper. > > * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This > will provide atomicity of batches as well as performance benefits for > batching, compared to individual Puts, but it will still involve RocksDB’s > insert paths which can involve expensive operations[0]. > > Instead, by creating SSTable files and instructing RocksDB to ingest the > files, writes can be batched even further and avoid expensive operations in > RocksDB. This is commonly utilized by other systems for restoration or > import processes, such as in CockroachDB[1], TiDB[2], and Percona[3]. There > are some restrictions on being able to generate SSTables, as well as > limitations for ingestion to be performant. Unfortunately, it’s all not > very well documented: > > 1. When generating an SSTable, keys need to be inserted in-order. > > 2. Ingested files should not have key-ranges that overlap with either > existing or other ingested files[4]. It is possible to ingest overlapping > SSTables, but this may incur significant overhead. > > To generate SSTables with non-overlapping key-ranges and to create them > with keys in-order, it would mean that the savepoints would need to be > ordered while processing them. I'm unsure if this is the case for how > Flink's savepoints are stored. > > I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is > used (eg: for incremental checkpoint or something else). I did notice it > is iterating over a temporary RocksDB instance and inserting into a "final > ” instance. These writes could be optimized in a similar manner. > Alternatively, it could be possible to use the temporary instance's > SSTables, ingest them, and prune data out with RocksDB's DeleteRange. > > To get started with prototyping, I was thinking of taking a simple > approach of making an interface for RocksDBWriteBatchWrapper and swapping > the implementation for one that does SSTable generation and ingestion. I > reckon that will be an easy way to sanity check whether it works at all. > > I was also planning on writing some benchmarks in RocksDB to understand > the difference for ingestion scenarios, as RocksDB itself is sparse on > details about SSTable ingestion[4] and does not have benchmarking for > ingestion. > > Does all of that seem sound? I'll report back when I get time to work out > that implementation and tests, likely during the coming weekend. > > > Joey > > --- > > [0]: I don’t have any specific sources on this. At a high-level, some of > the operations happening during writes include writing to the memtable > before flushing to an SSTable and doing merging and/or compaction. In > general, these will add write-amplification and overall overhead to bulk > insertion. These can largely be avoided by giving RocksDB SSTables, > especially if they have non-overlapping key-ranges. "Characterizing, > Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" ( > https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful > source that highlights what happens during various workloads. > > > [1]: CockroachDB is a database that uses RocksDB as the on-disk storage. > Their implementation consolidates bulk ingestion to an AddSSTable > command. Primarily, they have some choice of options for SSTable generation > and ingestion that are of interest: > > * SSTable generation: > https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L929-L966 > > * SSTable ingestion: > https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L842-L917 > > > [2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as > the on-disk storage. Their implementation of bulk ingestion is contained > within: > https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs > > Other useful references: > - https://github.com/tikv/tikv/issues/2404, discussing performance for > copy vs. move options. > > > [3]: Percona is a SQL database which supports a RocksDB backend. Their > implementation of ingestion can be found here: > https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815 > > > [4]: Again, there is not a lot of official resources on this. Notable > references I found on this include: > > * https://github.com/facebook/rocksdb/issues/2473, which describes at a > high-level how re-insertions work. > > * https://github.com/facebook/rocksdb/issues/3540, which describes the > performance costs for ingesting overlapping SSTables, and specific > benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564 > > * https://github.com/facebook/rocksdb/pull/3179, which describes the > mechanism for ingesting SSTable files: there need to be point-key overlap > checks for the LSM. > > * https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting > the same SSTable (due to restarts in import processes), can cause issues > for a particular set of options. > > * https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, > indicates compaction occurs more (or, only) when overlapping SSTables are > ingested. The thinking here is non-overlapping SSTable ingestion means very > few operations (compaction, merging, etc) occur afterward, with the right > tuning for generation and ingestion. > > * https://github.com/facebook/rocksdb/issues/5010, which discusses some > unresolved issues for high CPU overhead on ingestion. >