This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ddedeb0357eb89bef445ccb2d084c940f2c1c52
Author: 马越 <mayue.fi...@bytedance.com>
AuthorDate: Tue Aug 8 14:56:17 2023 +0200

    [FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (part 1)
---
 .../flink-statebackend-rocksdb/pom.xml             |  23 +++--
 .../state/EmbeddedRocksDBStateBackend.java         |  20 +++-
 .../state/RocksDBConfigurableOptions.java          |   7 ++
 .../state/RocksDBIncrementalCheckpointUtils.java   |  56 +++++++++++
 .../state/RocksDBKeyedStateBackendBuilder.java     |  10 +-
 .../streaming/state/RocksDBOperationUtils.java     |  41 +++++++-
 .../streaming/state/restore/RocksDBHandle.java     |  31 ++++++
 .../RocksDBIncrementalRestoreOperation.java        | 107 ++++++++++++++++++++-
 .../RocksIncrementalCheckpointRescalingTest.java   |  19 +++-
 9 files changed, 298 insertions(+), 16 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml 
b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 98aa501b1aa..592d29df673 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -19,8 +19,8 @@ under the License.
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
 
        <modelVersion>4.0.0</modelVersion>
 
@@ -61,9 +61,9 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>com.ververica</groupId>
+                       <groupId>io.github.fredia</groupId>
                        <artifactId>frocksdbjni</artifactId>
-                       <version>6.20.3-ververica-2.0</version>
+                       <version>8.6.7-ververica-test-1.0</version>
                </dependency>
 
                <!-- test dependencies -->
@@ -102,11 +102,18 @@ under the License.
                                                </goals>
                                                <configuration>
                                                        <includes>
-                                                               
<include>**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*</include>
-                                                               
<include>**/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest*</include>
+                                                               <include>
+                                                                       
**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*
+                                                               </include>
+                                                               <include>
+                                                                       
**/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest*
+                                                               </include>
                                                                <!-- Exporting 
RocksDBStateBackendConfigTest$TestOptionsFactory for pyflink tests -->
-                                                               
<include>**/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest*</include>
-                                                               
<include>**/org/apache/flink/contrib/streaming/state/benchmark/*</include>
+                                                               <include>
+                                                                       
**/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest*
+                                                               </include>
+                                                               
<include>**/org/apache/flink/contrib/streaming/state/benchmark/*
+                                                               </include>
                                                                
<include>META-INF/LICENSE</include>
                                                                
<include>META-INF/NOTICE</include>
                                                        </includes>
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 6d15c53c1a2..5fa5c68acf1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -71,6 +71,7 @@ import java.util.function.Supplier;
 
 import static org.apache.flink.configuration.description.TextElement.text;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -107,6 +108,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
 
     private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1;
 
+    private static final boolean UNDEFINED_USE_INGEST_DB_RESTORE_MODE = false;
+
     // ------------------------------------------------------------------------
 
     // -- configuration values, set in the application / configuration
@@ -170,6 +173,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      */
     private double overlapFractionThreshold;
 
+    private boolean useIngestDbRestoreMode;
+
     /** Factory for Write Buffer Manager and Block Cache. */
     private RocksDBMemoryFactory rocksDBMemoryFactory;
     // ------------------------------------------------------------------------
@@ -202,6 +207,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
         this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT;
         this.priorityQueueConfig = new RocksDBPriorityQueueConfig();
+        this.useIngestDbRestoreMode = UNDEFINED_USE_INGEST_DB_RESTORE_MODE;
     }
 
     /**
@@ -296,6 +302,11 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                 overlapFractionThreshold >= 0 && this.overlapFractionThreshold 
<= 1,
                 "Overlap fraction threshold of restoring should be between 0 
and 1");
 
+        useIngestDbRestoreMode =
+                original.useIngestDbRestoreMode == 
UNDEFINED_USE_INGEST_DB_RESTORE_MODE
+                        ? config.get(USE_INGEST_DB_RESTORE_MODE)
+                        : original.useIngestDbRestoreMode;
+
         this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
     }
 
@@ -466,7 +477,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         .setNativeMetricOptions(
                                 
resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
                         .setWriteBatchSize(getWriteBatchSize())
-                        
.setOverlapFractionThreshold(getOverlapFractionThreshold());
+                        
.setOverlapFractionThreshold(getOverlapFractionThreshold())
+                        
.setUseIngestDbRestoreMode(getUseIngestDbRestoreMode());
         return builder.build();
     }
 
@@ -805,6 +817,12 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                 : overlapFractionThreshold;
     }
 
+    boolean getUseIngestDbRestoreMode() {
+        return useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE
+                ? USE_INGEST_DB_RESTORE_MODE.defaultValue()
+                : useIngestDbRestoreMode;
+    }
+
     // ------------------------------------------------------------------------
     //  utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 5795e72909d..255182f3502 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -309,6 +309,13 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                                     + "has a chance to be an initial handle. "
                                     + "The default value is 0.0, there is 
always a handle will be selected for initialization. ");
 
+    public static final ConfigOption<Boolean> USE_INGEST_DB_RESTORE_MODE =
+            key("state.backend.rocksdb.use-ingest-db-restore-mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A recovery mode that directly clips and ingests 
multiple DBs during state recovery. ");
+
     static final ConfigOption<?>[] CANDIDATE_CONFIGS =
             new ConfigOption<?>[] {
                 // configurable DBOptions
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index a835d10c481..4fed7e72e4d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -20,8 +20,12 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 
+import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -29,9 +33,13 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 /** Utils for RocksDB Incremental Checkpoint. */
 public class RocksDBIncrementalCheckpointUtils {
@@ -154,6 +162,54 @@ public class RocksDBIncrementalCheckpointUtils {
         }
     }
 
+    /**
+     * Clip the entries in the CF according to the range [begin_key, end_key). 
Any entries outside
+     * this range will be completely deleted (including tombstones).
+     *
+     * @param db the target need to be clipped.
+     * @param columnFamilyHandles the column family need to be clipped.
+     * @param beginKeyBytes the begin key bytes
+     * @param endKeyBytes the end key bytes
+     */
+    public static void clipColumnFamilies(
+            RocksDB db,
+            List<ColumnFamilyHandle> columnFamilyHandles,
+            byte[] beginKeyBytes,
+            byte[] endKeyBytes)
+            throws RocksDBException {
+
+        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
+            db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, 
endKeyBytes);
+        }
+    }
+
+    public static Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> 
exportColumnFamilies(
+            RocksDB db,
+            List<ColumnFamilyHandle> columnFamilyHandles,
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            Path exportBasePath)
+            throws RocksDBException {
+
+        HashMap<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> 
cfMetaInfoAndData =
+                new HashMap<>();
+        try (final Checkpoint checkpoint = Checkpoint.create(db)) {
+            for (int i = 0; i < columnFamilyHandles.size(); i++) {
+                StateMetaInfoSnapshot metaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
+
+                RegisteredStateMetaInfoBase stateMetaInfo =
+                        
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(metaInfoSnapshot);
+
+                ExportImportFilesMetaData cfMetaData =
+                        checkpoint.exportColumnFamily(
+                                columnFamilyHandles.get(i),
+                                
exportBasePath.resolve(UUID.randomUUID().toString()).toString());
+                cfMetaInfoAndData.put(stateMetaInfo, cfMetaData);
+            }
+        }
+
+        return cfMetaInfoAndData;
+    }
+
     /** check whether the bytes is before prefixBytes in the character order. 
*/
     public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull 
byte[] prefixBytes) {
         final int prefixLength = prefixBytes.length;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 3f67a8b5f85..9bfb8238b85 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -77,6 +77,7 @@ import java.util.UUID;
 import java.util.function.Function;
 
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -125,6 +126,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
 
     private RocksDB injectedTestDB; // for testing
     private double overlapFractionThreshold = 
RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
+    private boolean useIngestDbRestoreMode = 
USE_INGEST_DB_RESTORE_MODE.defaultValue();
     private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for 
testing
     private RocksDBStateUploader injectRocksDBStateUploader; // for testing
 
@@ -269,6 +271,11 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         return this;
     }
 
+    RocksDBKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean 
useIngestDbRestoreMode) {
+        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
+        return this;
+    }
+
     public static File getInstanceRocksDBPath(File instanceBasePath) {
         return new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
     }
@@ -482,7 +489,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     ttlCompactFiltersManager,
                     writeBatchSize,
                     optionsContainer.getWriteBufferManagerCapacity(),
-                    overlapFractionThreshold);
+                    overlapFractionThreshold,
+                    useIngestDbRestoreMode);
         } else if (priorityQueueConfig.getPriorityQueueStateType()
                 == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
             return new RocksDBHeapTimersFullRestoreOperation<>(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 32f64022a16..ccb435dffc7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -31,6 +31,8 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
+import org.rocksdb.ImportColumnFamilyOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -123,7 +125,8 @@ public class RocksDBOperationUtils {
             RocksDB db,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
-            @Nullable Long writeBufferManagerCapacity) {
+            @Nullable Long writeBufferManagerCapacity,
+            List<ExportImportFilesMetaData> cfMetaDataList) {
 
         ColumnFamilyDescriptor columnFamilyDescriptor =
                 createColumnFamilyDescriptor(
@@ -131,8 +134,27 @@ public class RocksDBOperationUtils {
                         columnFamilyOptionsFactory,
                         ttlCompactFiltersManager,
                         writeBufferManagerCapacity);
-        return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(
-                createColumnFamily(columnFamilyDescriptor, db), metaInfoBase);
+
+        ColumnFamilyHandle columnFamilyHandle =
+                cfMetaDataList == null
+                        ? createColumnFamily(columnFamilyDescriptor, db)
+                        : createColumnFamilyWithImport(columnFamilyDescriptor, 
db, cfMetaDataList);
+        return new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
+    }
+
+    public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
+            RegisteredStateMetaInfoBase metaInfoBase,
+            RocksDB db,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+            @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+            @Nullable Long writeBufferManagerCapacity) {
+        return createStateInfo(
+                metaInfoBase,
+                db,
+                columnFamilyOptionsFactory,
+                ttlCompactFiltersManager,
+                writeBufferManagerCapacity,
+                null);
     }
 
     /**
@@ -230,6 +252,19 @@ public class RocksDBOperationUtils {
         }
     }
 
+    private static ColumnFamilyHandle createColumnFamilyWithImport(
+            ColumnFamilyDescriptor columnDescriptor,
+            RocksDB db,
+            List<ExportImportFilesMetaData> metaDataList) {
+        try {
+            return db.createColumnFamilyWithImport(
+                    columnDescriptor, new ImportColumnFamilyOptions(), 
metaDataList);
+        } catch (RocksDBException e) {
+            IOUtils.closeQuietly(columnDescriptor.getOptions());
+            throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", e);
+        }
+    }
+
     public static void addColumnFamilyOptionsToCloseLater(
             List<ColumnFamilyOptions> columnFamilyOptions, ColumnFamilyHandle 
columnFamilyHandle) {
         try {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index 6c1d4625fb6..b5d3c7efc74 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -28,11 +28,13 @@ import 
org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -185,6 +187,35 @@ class RocksDBHandle implements AutoCloseable {
         return registeredStateMetaInfoEntry;
     }
 
+    RocksDbKvStateInfo registerStateColumnFamilyHandleWithImport(
+            RegisteredStateMetaInfoBase stateMetaInfo,
+            List<ExportImportFilesMetaData> cfMetaDataList) {
+
+        RocksDbKvStateInfo registeredStateMetaInfoEntry =
+                kvStateInformation.get(stateMetaInfo.getName());
+        if (registeredStateMetaInfoEntry != null) {
+            System.out.println("test");
+        }
+        Preconditions.checkState(registeredStateMetaInfoEntry == null);
+
+        registeredStateMetaInfoEntry =
+                RocksDBOperationUtils.createStateInfo(
+                        stateMetaInfo,
+                        db,
+                        columnFamilyOptionsFactory,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity,
+                        cfMetaDataList);
+
+        RocksDBOperationUtils.registerKvStateInformation(
+                kvStateInformation,
+                nativeMetricMonitor,
+                stateMetaInfo.getName(),
+                registeredStateMetaInfoEntry);
+
+        return registeredStateMetaInfoEntry;
+    }
+
     /**
      * This recreates the new working directory of the recovered RocksDB 
instance and links/copies
      * the contents from a local state.
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index faac6bee9bf..01559ea55d8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,11 +51,13 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -68,10 +70,12 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -107,6 +111,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
     private boolean isKeySerializerCompatibilityChecked;
 
+    private ThrowingConsumer<Collection<KeyedStateHandle>, Exception> 
rescalingRestoreOperation;
+
     public RocksDBIncrementalRestoreOperation(
             String operatorIdentifier,
             KeyGroupRange keyGroupRange,
@@ -127,7 +133,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
             Long writeBufferManagerCapacity,
-            double overlapFractionThreshold) {
+            double overlapFractionThreshold,
+            boolean useIngestDbRestoreMode) {
         this.rocksHandle =
                 new RocksDBHandle(
                         kvStateInformation,
@@ -153,6 +160,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.keySerializerProvider = keySerializerProvider;
         this.userCodeClassLoader = userCodeClassLoader;
+        this.rescalingRestoreOperation =
+                useIngestDbRestoreMode ? this::restoreWithIngestDbMode : 
this::restoreWithRescaling;
     }
 
     /** Root method that branches for different implementations of {@link 
KeyedStateHandle}. */
@@ -170,7 +179,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                         || 
!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));
 
         if (isRescaling) {
-            restoreWithRescaling(restoreStateHandles);
+            rescalingRestoreOperation.accept(restoreStateHandles);
         } else {
             restoreWithoutRescaling(theFirstStateHandle);
         }
@@ -405,6 +414,100 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         }
     }
 
+    /**
+     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
+     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
+     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     */
+    private void restoreWithIngestDbMode(Collection<KeyedStateHandle> 
restoreStateHandles)
+            throws Exception {
+
+        Preconditions.checkArgument(restoreStateHandles != null && 
!restoreStateHandles.isEmpty());
+
+        Map<StateHandleID, StateHandleDownloadSpec> allDownloadSpecs =
+                
CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size());
+
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
+        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
+        Files.createDirectories(exportCfBasePath);
+
+        // Open base db as Empty DB
+        this.rocksHandle.openDB();
+
+        // Prepare and collect all the download request to pull remote state 
to a local directory
+        for (KeyedStateHandle stateHandle : restoreStateHandles) {
+            if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
+                throw unexpectedStateHandleException(
+                        IncrementalRemoteKeyedStateHandle.class, 
stateHandle.getClass());
+            }
+            StateHandleDownloadSpec downloadRequest =
+                    new StateHandleDownloadSpec(
+                            (IncrementalRemoteKeyedStateHandle) stateHandle,
+                            
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
+            allDownloadSpecs.put(stateHandle.getStateHandleId(), 
downloadRequest);
+        }
+
+        // Process all state downloads
+        transferRemoteStateToLocalDirectory(allDownloadSpecs.values());
+
+        // Transfer remaining key-groups from temporary instance into base DB
+        byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
+
+        byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
+
+        HashMap<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> 
cfMetaDataToImport =
+                new HashMap();
+
+        // Insert all remaining state through creating temporary RocksDB 
instances
+        for (StateHandleDownloadSpec downloadRequest : 
allDownloadSpecs.values()) {
+            logger.info(
+                    "Starting to restore from state handle: {} with 
rescaling.",
+                    downloadRequest.getStateHandle());
+
+            try (RestoredDBInstance tmpRestoreDBInfo =
+                    restoreTempDBInstanceFromDownloadedState(downloadRequest)) 
{
+
+                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                        tmpRestoreDBInfo.columnFamilyHandles;
+
+                // Clip all tmp db to Range [startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes)
+                RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
+                        tmpRestoreDBInfo.db,
+                        tmpColumnFamilyHandles,
+                        startKeyGroupPrefixBytes,
+                        stopKeyGroupPrefixBytes);
+
+                // Export all the Column Families
+                Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> 
exportedCFAndMetaData =
+                        RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
+                                tmpRestoreDBInfo.db,
+                                tmpColumnFamilyHandles,
+                                tmpRestoreDBInfo.stateMetaInfoSnapshots,
+                                exportCfBasePath);
+
+                exportedCFAndMetaData.forEach(
+                        (stateMeta, cfMetaData) -> {
+                            if (!cfMetaData.files().isEmpty()) {
+                                cfMetaDataToImport.putIfAbsent(stateMeta, new 
ArrayList<>());
+                                
cfMetaDataToImport.get(stateMeta).add(cfMetaData);
+                            }
+                        });
+            } finally {
+                cleanUpPathQuietly(downloadRequest.getDownloadDestination());
+            }
+        }
+
+        try {
+            
cfMetaDataToImport.forEach(this.rocksHandle::registerStateColumnFamilyHandleWithImport);
+        } finally {
+            cleanUpPathQuietly(exportCfBasePath);
+        }
+    }
+
     private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle 
stateHandle)
             throws Exception {
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
index bdc85ef2e5d..7e253cd2cc6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
@@ -41,14 +41,26 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 /** Tests to guard rescaling from checkpoint. */
+@RunWith(Parameterized.class)
 public class RocksIncrementalCheckpointRescalingTest extends TestLogger {
 
     @Rule public TemporaryFolder rootFolder = new TemporaryFolder();
 
+    @Parameterized.Parameters(name = "useIngestDbRestoreMode: {0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @Parameterized.Parameter public boolean useIngestDbRestoreMode;
+
     private final int maxParallelism = 10;
 
     private KeySelector<String, String> keySelector = new TestKeySelector();
@@ -419,7 +431,12 @@ public class RocksIncrementalCheckpointRescalingTest 
extends TestLogger {
     }
 
     private StateBackend getStateBackend() throws Exception {
-        return new RocksDBStateBackend("file://" + 
rootFolder.newFolder().getAbsolutePath(), true);
+        RocksDBStateBackend rocksDBStateBackend =
+                new RocksDBStateBackend("file://" + 
rootFolder.newFolder().getAbsolutePath(), true);
+        Configuration configuration = new Configuration();
+        configuration.setBoolean(
+                RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, 
useIngestDbRestoreMode);
+        return rocksDBStateBackend.configure(configuration, 
getClass().getClassLoader());
     }
 
     /** A simple keyed function for tests. */

Reply via email to