This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ddedeb0357eb89bef445ccb2d084c940f2c1c52 Author: 马越 <mayue.fi...@bytedance.com> AuthorDate: Tue Aug 8 14:56:17 2023 +0200 [FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (part 1) --- .../flink-statebackend-rocksdb/pom.xml | 23 +++-- .../state/EmbeddedRocksDBStateBackend.java | 20 +++- .../state/RocksDBConfigurableOptions.java | 7 ++ .../state/RocksDBIncrementalCheckpointUtils.java | 56 +++++++++++ .../state/RocksDBKeyedStateBackendBuilder.java | 10 +- .../streaming/state/RocksDBOperationUtils.java | 41 +++++++- .../streaming/state/restore/RocksDBHandle.java | 31 ++++++ .../RocksDBIncrementalRestoreOperation.java | 107 ++++++++++++++++++++- .../RocksIncrementalCheckpointRescalingTest.java | 19 +++- 9 files changed, 298 insertions(+), 16 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml index 98aa501b1aa..592d29df673 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml +++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml @@ -19,8 +19,8 @@ under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -61,9 +61,9 @@ under the License. </dependency> <dependency> - <groupId>com.ververica</groupId> + <groupId>io.github.fredia</groupId> <artifactId>frocksdbjni</artifactId> - <version>6.20.3-ververica-2.0</version> + <version>8.6.7-ververica-test-1.0</version> </dependency> <!-- test dependencies --> @@ -102,11 +102,18 @@ under the License. </goals> <configuration> <includes> - <include>**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*</include> - <include>**/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest*</include> + <include> + **/org/apache/flink/contrib/streaming/state/RocksDBTestUtils* + </include> + <include> + **/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest* + </include> <!-- Exporting RocksDBStateBackendConfigTest$TestOptionsFactory for pyflink tests --> - <include>**/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest*</include> - <include>**/org/apache/flink/contrib/streaming/state/benchmark/*</include> + <include> + **/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest* + </include> + <include>**/org/apache/flink/contrib/streaming/state/benchmark/* + </include> <include>META-INF/LICENSE</include> <include>META-INF/NOTICE</include> </includes> diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 6d15c53c1a2..5fa5c68acf1 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -71,6 +71,7 @@ import java.util.function.Supplier; import static org.apache.flink.configuration.description.TextElement.text; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM; import static org.apache.flink.util.Preconditions.checkArgument; @@ -107,6 +108,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1; + private static final boolean UNDEFINED_USE_INGEST_DB_RESTORE_MODE = false; + // ------------------------------------------------------------------------ // -- configuration values, set in the application / configuration @@ -170,6 +173,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke */ private double overlapFractionThreshold; + private boolean useIngestDbRestoreMode; + /** Factory for Write Buffer Manager and Block Cache. */ private RocksDBMemoryFactory rocksDBMemoryFactory; // ------------------------------------------------------------------------ @@ -202,6 +207,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT; this.priorityQueueConfig = new RocksDBPriorityQueueConfig(); + this.useIngestDbRestoreMode = UNDEFINED_USE_INGEST_DB_RESTORE_MODE; } /** @@ -296,6 +302,11 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke overlapFractionThreshold >= 0 && this.overlapFractionThreshold <= 1, "Overlap fraction threshold of restoring should be between 0 and 1"); + useIngestDbRestoreMode = + original.useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE + ? config.get(USE_INGEST_DB_RESTORE_MODE) + : original.useIngestDbRestoreMode; + this.rocksDBMemoryFactory = original.rocksDBMemoryFactory; } @@ -466,7 +477,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke .setNativeMetricOptions( resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)) .setWriteBatchSize(getWriteBatchSize()) - .setOverlapFractionThreshold(getOverlapFractionThreshold()); + .setOverlapFractionThreshold(getOverlapFractionThreshold()) + .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode()); return builder.build(); } @@ -805,6 +817,12 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke : overlapFractionThreshold; } + boolean getUseIngestDbRestoreMode() { + return useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE + ? USE_INGEST_DB_RESTORE_MODE.defaultValue() + : useIngestDbRestoreMode; + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 5795e72909d..255182f3502 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -309,6 +309,13 @@ public class RocksDBConfigurableOptions implements Serializable { + "has a chance to be an initial handle. " + "The default value is 0.0, there is always a handle will be selected for initialization. "); + public static final ConfigOption<Boolean> USE_INGEST_DB_RESTORE_MODE = + key("state.backend.rocksdb.use-ingest-db-restore-mode") + .booleanType() + .defaultValue(false) + .withDescription( + "A recovery mode that directly clips and ingests multiple DBs during state recovery. "); + static final ConfigOption<?>[] CANDIDATE_CONFIGS = new ConfigOption<?>[] { // configurable DBOptions diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index a835d10c481..4fed7e72e4d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -20,8 +20,12 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ExportImportFilesMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -29,9 +33,13 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.nio.file.Path; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; /** Utils for RocksDB Incremental Checkpoint. */ public class RocksDBIncrementalCheckpointUtils { @@ -154,6 +162,54 @@ public class RocksDBIncrementalCheckpointUtils { } } + /** + * Clip the entries in the CF according to the range [begin_key, end_key). Any entries outside + * this range will be completely deleted (including tombstones). + * + * @param db the target need to be clipped. + * @param columnFamilyHandles the column family need to be clipped. + * @param beginKeyBytes the begin key bytes + * @param endKeyBytes the end key bytes + */ + public static void clipColumnFamilies( + RocksDB db, + List<ColumnFamilyHandle> columnFamilyHandles, + byte[] beginKeyBytes, + byte[] endKeyBytes) + throws RocksDBException { + + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, endKeyBytes); + } + } + + public static Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> exportColumnFamilies( + RocksDB db, + List<ColumnFamilyHandle> columnFamilyHandles, + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, + Path exportBasePath) + throws RocksDBException { + + HashMap<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> cfMetaInfoAndData = + new HashMap<>(); + try (final Checkpoint checkpoint = Checkpoint.create(db)) { + for (int i = 0; i < columnFamilyHandles.size(); i++) { + StateMetaInfoSnapshot metaInfoSnapshot = stateMetaInfoSnapshots.get(i); + + RegisteredStateMetaInfoBase stateMetaInfo = + RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(metaInfoSnapshot); + + ExportImportFilesMetaData cfMetaData = + checkpoint.exportColumnFamily( + columnFamilyHandles.get(i), + exportBasePath.resolve(UUID.randomUUID().toString()).toString()); + cfMetaInfoAndData.put(stateMetaInfo, cfMetaData); + } + } + + return cfMetaInfoAndData; + } + /** check whether the bytes is before prefixBytes in the character order. */ public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) { final int prefixLength = prefixBytes.length; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 3f67a8b5f85..9bfb8238b85 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -77,6 +77,7 @@ import java.util.UUID; import java.util.function.Function; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -125,6 +126,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken private RocksDB injectedTestDB; // for testing private double overlapFractionThreshold = RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue(); + private boolean useIngestDbRestoreMode = USE_INGEST_DB_RESTORE_MODE.defaultValue(); private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing private RocksDBStateUploader injectRocksDBStateUploader; // for testing @@ -269,6 +271,11 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken return this; } + RocksDBKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean useIngestDbRestoreMode) { + this.useIngestDbRestoreMode = useIngestDbRestoreMode; + return this; + } + public static File getInstanceRocksDBPath(File instanceBasePath) { return new File(instanceBasePath, DB_INSTANCE_DIR_STRING); } @@ -482,7 +489,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken ttlCompactFiltersManager, writeBatchSize, optionsContainer.getWriteBufferManagerCapacity(), - overlapFractionThreshold); + overlapFractionThreshold, + useIngestDbRestoreMode); } else if (priorityQueueConfig.getPriorityQueueStateType() == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) { return new RocksDBHeapTimersFullRestoreOperation<>( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 32f64022a16..ccb435dffc7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -31,6 +31,8 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ExportImportFilesMetaData; +import org.rocksdb.ImportColumnFamilyOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -123,7 +125,8 @@ public class RocksDBOperationUtils { RocksDB db, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - @Nullable Long writeBufferManagerCapacity) { + @Nullable Long writeBufferManagerCapacity, + List<ExportImportFilesMetaData> cfMetaDataList) { ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor( @@ -131,8 +134,27 @@ public class RocksDBOperationUtils { columnFamilyOptionsFactory, ttlCompactFiltersManager, writeBufferManagerCapacity); - return new RocksDBKeyedStateBackend.RocksDbKvStateInfo( - createColumnFamily(columnFamilyDescriptor, db), metaInfoBase); + + ColumnFamilyHandle columnFamilyHandle = + cfMetaDataList == null + ? createColumnFamily(columnFamilyDescriptor, db) + : createColumnFamilyWithImport(columnFamilyDescriptor, db, cfMetaDataList); + return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase); + } + + public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo( + RegisteredStateMetaInfoBase metaInfoBase, + RocksDB db, + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, + @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, + @Nullable Long writeBufferManagerCapacity) { + return createStateInfo( + metaInfoBase, + db, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity, + null); } /** @@ -230,6 +252,19 @@ public class RocksDBOperationUtils { } } + private static ColumnFamilyHandle createColumnFamilyWithImport( + ColumnFamilyDescriptor columnDescriptor, + RocksDB db, + List<ExportImportFilesMetaData> metaDataList) { + try { + return db.createColumnFamilyWithImport( + columnDescriptor, new ImportColumnFamilyOptions(), metaDataList); + } catch (RocksDBException e) { + IOUtils.closeQuietly(columnDescriptor.getOptions()); + throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); + } + } + public static void addColumnFamilyOptionsToCloseLater( List<ColumnFamilyOptions> columnFamilyOptions, ColumnFamilyHandle columnFamilyHandle) { try { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java index 6c1d4625fb6..b5d3c7efc74 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java @@ -28,11 +28,13 @@ import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ExportImportFilesMetaData; import org.rocksdb.RocksDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,6 +187,35 @@ class RocksDBHandle implements AutoCloseable { return registeredStateMetaInfoEntry; } + RocksDbKvStateInfo registerStateColumnFamilyHandleWithImport( + RegisteredStateMetaInfoBase stateMetaInfo, + List<ExportImportFilesMetaData> cfMetaDataList) { + + RocksDbKvStateInfo registeredStateMetaInfoEntry = + kvStateInformation.get(stateMetaInfo.getName()); + if (registeredStateMetaInfoEntry != null) { + System.out.println("test"); + } + Preconditions.checkState(registeredStateMetaInfoEntry == null); + + registeredStateMetaInfoEntry = + RocksDBOperationUtils.createStateInfo( + stateMetaInfo, + db, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity, + cfMetaDataList); + + RocksDBOperationUtils.registerKvStateInformation( + kvStateInformation, + nativeMetricMonitor, + stateMetaInfo.getName(), + registeredStateMetaInfoEntry); + + return registeredStateMetaInfoEntry; + } + /** * This recreates the new working directory of the recovered RocksDB instance and links/copies * the contents from a local state. diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index faac6bee9bf..01559ea55d8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -51,11 +51,13 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; +import org.apache.flink.util.function.ThrowingConsumer; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ExportImportFilesMetaData; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -68,10 +70,12 @@ import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -107,6 +111,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private boolean isKeySerializerCompatibilityChecked; + private ThrowingConsumer<Collection<KeyedStateHandle>, Exception> rescalingRestoreOperation; + public RocksDBIncrementalRestoreOperation( String operatorIdentifier, KeyGroupRange keyGroupRange, @@ -127,7 +133,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, - double overlapFractionThreshold) { + double overlapFractionThreshold, + boolean useIngestDbRestoreMode) { this.rocksHandle = new RocksDBHandle( kvStateInformation, @@ -153,6 +160,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.keySerializerProvider = keySerializerProvider; this.userCodeClassLoader = userCodeClassLoader; + this.rescalingRestoreOperation = + useIngestDbRestoreMode ? this::restoreWithIngestDbMode : this::restoreWithRescaling; } /** Root method that branches for different implementations of {@link KeyedStateHandle}. */ @@ -170,7 +179,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange)); if (isRescaling) { - restoreWithRescaling(restoreStateHandles); + rescalingRestoreOperation.accept(restoreStateHandles); } else { restoreWithoutRescaling(theFirstStateHandle); } @@ -405,6 +414,100 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper } } + /** + * Recovery from multi incremental states with rescaling. For rescaling, this method creates a + * temporary RocksDB instance for a key-groups shard. All contents from the temporary instance + * are copied into the real restore instance and then the temporary instance is discarded. + */ + private void restoreWithIngestDbMode(Collection<KeyedStateHandle> restoreStateHandles) + throws Exception { + + Preconditions.checkArgument(restoreStateHandles != null && !restoreStateHandles.isEmpty()); + + Map<StateHandleID, StateHandleDownloadSpec> allDownloadSpecs = + CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size()); + + final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); + final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs"); + Files.createDirectories(exportCfBasePath); + + // Open base db as Empty DB + this.rocksHandle.openDB(); + + // Prepare and collect all the download request to pull remote state to a local directory + for (KeyedStateHandle stateHandle : restoreStateHandles) { + if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) { + throw unexpectedStateHandleException( + IncrementalRemoteKeyedStateHandle.class, stateHandle.getClass()); + } + StateHandleDownloadSpec downloadRequest = + new StateHandleDownloadSpec( + (IncrementalRemoteKeyedStateHandle) stateHandle, + absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); + allDownloadSpecs.put(stateHandle.getStateHandleId(), downloadRequest); + } + + // Process all state downloads + transferRemoteStateToLocalDirectory(allDownloadSpecs.values()); + + // Transfer remaining key-groups from temporary instance into base DB + byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); + + byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); + + HashMap<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> cfMetaDataToImport = + new HashMap(); + + // Insert all remaining state through creating temporary RocksDB instances + for (StateHandleDownloadSpec downloadRequest : allDownloadSpecs.values()) { + logger.info( + "Starting to restore from state handle: {} with rescaling.", + downloadRequest.getStateHandle()); + + try (RestoredDBInstance tmpRestoreDBInfo = + restoreTempDBInstanceFromDownloadedState(downloadRequest)) { + + List<ColumnFamilyHandle> tmpColumnFamilyHandles = + tmpRestoreDBInfo.columnFamilyHandles; + + // Clip all tmp db to Range [startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes) + RocksDBIncrementalCheckpointUtils.clipColumnFamilies( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandles, + startKeyGroupPrefixBytes, + stopKeyGroupPrefixBytes); + + // Export all the Column Families + Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> exportedCFAndMetaData = + RocksDBIncrementalCheckpointUtils.exportColumnFamilies( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandles, + tmpRestoreDBInfo.stateMetaInfoSnapshots, + exportCfBasePath); + + exportedCFAndMetaData.forEach( + (stateMeta, cfMetaData) -> { + if (!cfMetaData.files().isEmpty()) { + cfMetaDataToImport.putIfAbsent(stateMeta, new ArrayList<>()); + cfMetaDataToImport.get(stateMeta).add(cfMetaData); + } + }); + } finally { + cleanUpPathQuietly(downloadRequest.getDownloadDestination()); + } + } + + try { + cfMetaDataToImport.forEach(this.rocksHandle::registerStateColumnFamilyHandleWithImport); + } finally { + cleanUpPathQuietly(exportCfBasePath); + } + } + private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle stateHandle) throws Exception { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java index bdc85ef2e5d..7e253cd2cc6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java @@ -41,14 +41,26 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; import java.util.List; /** Tests to guard rescaling from checkpoint. */ +@RunWith(Parameterized.class) public class RocksIncrementalCheckpointRescalingTest extends TestLogger { @Rule public TemporaryFolder rootFolder = new TemporaryFolder(); + @Parameterized.Parameters(name = "useIngestDbRestoreMode: {0}") + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter public boolean useIngestDbRestoreMode; + private final int maxParallelism = 10; private KeySelector<String, String> keySelector = new TestKeySelector(); @@ -419,7 +431,12 @@ public class RocksIncrementalCheckpointRescalingTest extends TestLogger { } private StateBackend getStateBackend() throws Exception { - return new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); + RocksDBStateBackend rocksDBStateBackend = + new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); + Configuration configuration = new Configuration(); + configuration.setBoolean( + RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode); + return rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); } /** A simple keyed function for tests. */