[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250477821 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java ## @@ -36,43 +36,47 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService; /** - * Data transfer utils for {@link RocksDBKeyedStateBackend}. + * Help class for downloading RocksDBState. */ -class RocksDbStateDataTransfer { +public class RocksDBStateDownloader extends RocksDBStateDataTransfer { + public RocksDBStateDownloader(int restoringThreadNum) { + super(restoringThreadNum); + } - static void transferAllStateDataToDirectory( + /** +* Transfer all state data to the target directory using specified number of threads. +* +* @param restoreStateHandle Handles used to retrieve the state data. +* @param dest The target directory which the state data will be stored. +* @param closeableRegistry Which all the inputStream/outputStream will be registered and unregistered. +* +* @throws Exception Thrown if can not transfer all the state data. +*/ + public void transferAllStateDataToDirectory( IncrementalKeyedStateHandle restoreStateHandle, Path dest, - int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception { final Map sstFiles = restoreStateHandle.getSharedState(); final Map miscFiles = restoreStateHandle.getPrivateState(); - downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); - downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry); Review comment: @azagrebin We can not share `closeableRegistry` because of supportting parallel snapshot. If we share the `closeableRegistry` when parallel snapshot, the later complete snapshot will come into an Exception `IOException("Cannot register Closeable, registry is already closed. Closing argument.")`(The `closeableRegistry` was close in `AsyncSnapshotCallable#closeSnapshotIO`) when registering the input/outputstream to the registry, such as the [Travis log said](https://travis-ci.org/apache/flink/jobs/483728783). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250456369 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.CheckedSupplier; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/** + * Help class for uploading RocksDB state. + */ +public class RocksDBStateUploader extends RocksDBStateDownloader { Review comment: My bad, I chose the wrong one when using auto-complete. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r24807 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -218,7 +218,10 @@ private final boolean enableIncrementalCheckpointing; /** Thread number used to download from DFS when restore. */ - private final int restoringThreadNum; + private final int numberOfRestoringThreads; + + /** Thread number used to upload to DFS when snapshot. */ + private final int numberOfSnapshottingThreads; Review comment: @StefanRRichter In my first thought, I just considered the flexibility. It makes sense to me that we union the two parameters, because both are for incremental snapshot/restore. In my company, the snapshot processure is not a bottleneck, we only enabled multithread restoring. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r24807 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -218,7 +218,10 @@ private final boolean enableIncrementalCheckpointing; /** Thread number used to download from DFS when restore. */ - private final int restoringThreadNum; + private final int numberOfRestoringThreads; + + /** Thread number used to upload to DFS when snapshot. */ + private final int numberOfSnapshottingThreads; Review comment: @StefanRRichter In my opinion, there may be a scenario that we use multithread to restore from the checkpoint but do not need to use multithread to snapshot when incremental checkpoint. In my company, we only enabled multithread restoring. we use only RocksDBIncremental Checkpoint to snapshot and restore from the latest completed checkpoint(we do not use savepoint to restore the job and do not use HeapStateBackend in the product). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r24807 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -218,7 +218,10 @@ private final boolean enableIncrementalCheckpointing; /** Thread number used to download from DFS when restore. */ - private final int restoringThreadNum; + private final int numberOfRestoringThreads; + + /** Thread number used to upload to DFS when snapshot. */ + private final int numberOfSnapshottingThreads; Review comment: @StefanRRichter In my opinion, there may be a scenario that we use multithread to restore from the savepoint but do not need to use multithread to snapshot when incremental checkpoint. In my company, we only enabled multithread restoring. we use only RocksDBIncremental Checkpoint to snapshot and restore from the latest completed checkpoint(we do not use savepoint to restore the job and do not use HeapStateBackend in the product). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246367385 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() { + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() { + return new ByteStreamStateHandle("testHandle", "testHandle".getBytes()); + } + + @Override + public void close() { + } + + @Override + public long getPos() { + return 0; + } + + @Override + public void flush() { + } + + @Override + public void sync() { + } + + @Override + public void write(int b) throws IOException { + throw expectedException; + } + }; + CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream; + + File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); + generateRandomFileContent(file.getPath(), 20); + + Map filePaths = new HashMap<>(1); + filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath())); + try { + RocksDbStateDataTransfer.uploadFilesToCheckpointFs( + filePaths, + 5, + checkpointStreamFactory, + new CloseableRegistry(), + new HashMap<>()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e); + } + } + + /** +* Test that upload files with multi-thread correctly. +*/ + @Test + public void testMultiThreadUploadCorrectly() throws Exception { + + File checkpointPrivateFolder = temporaryFolder.newFolder("private"); + Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath()); + + File checkpointSharedFolder = temporaryFolder.newFolder("shared"); + Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath()); + + FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem(); + int fileStateSizeThreshold = 1024; + FsCheckpointStreamFactory checkpointStreamFactory = + new FsCheckpointStreamFactory(fileSystem, checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold); + + String localFolder = "local"; + temporaryFolder.newFolder(localFolder); + + int sstFileCount = 6; + Map sstFilePaths = new HashMap<>(sstFileCount); + generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold, sstFilePaths); + + int miscFileCount = 3; + Map miscFilePaths = new HashMap<>(miscFileCount); + ThreadLocalRandom random = ThreadLocalRandom.current(); + File currentFile = temporaryFolder.newFile(String.format("%s/CURRENT", localFolder)); + generateRandomFileContent(currentFile.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("CURRENT"), Path.fromLocalFile(currentFile)); + + File manifest = temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder)); + generateRandomFileContent(manifest.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("MANIFEST"), Path.fromLocalFile(manifest)); + + File options = temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder)); +
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246358163 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadFilesToCheckpointFs( + @Nonnull Map files, + int numberOfSnapshottingThreads, + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry closeableRegistry, + Map hanldes) throws Exception { Review comment: @azagrebin The implementation here returns `handles` implicitly because I thought the caller of the function `uploadFilesToCheckpointFs` may know the size of `handles`, and could init the map by `new HashMap(size)`. I agree that the map's size here would not be too large because we'are under incremental mode, I'll change to the explicit mode. Anyway, I'll add a java doc for these two public functions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246358163 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadFilesToCheckpointFs( + @Nonnull Map files, + int numberOfSnapshottingThreads, + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry closeableRegistry, + Map hanldes) throws Exception { Review comment: @azagrebin The implementation here returns `handles` implicitly because I thought the caller of the function `uploadFilesToCheckpointFs` may know the size of `handles`, and could init the map by `new HashMap(size)`. I agree that the map's size here would not be too large because we'are under incremental mode, I'll change to the explicitly mode. Anyway, I'll add a java doc for these two public function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246350022 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() { + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() { + return new ByteStreamStateHandle("testHandle", "testHandle".getBytes()); + } + + @Override + public void close() { + } + + @Override + public long getPos() { + return 0; + } + + @Override + public void flush() { + } + + @Override + public void sync() { + } + + @Override + public void write(int b) throws IOException { + throw expectedException; + } + }; + CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream; + + File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); + generateRandomFileContent(file.getPath(), 20); + + Map filePaths = new HashMap<>(1); + filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath())); + try { + RocksDbStateDataTransfer.uploadFilesToCheckpointFs( + filePaths, + 5, + checkpointStreamFactory, + new CloseableRegistry(), + new HashMap<>()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e); + } + } + + /** +* Test that upload files with multi-thread correctly. +*/ + @Test + public void testMultiThreadUploadCorrectly() throws Exception { + + File checkpointPrivateFolder = temporaryFolder.newFolder("private"); + Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath()); + + File checkpointSharedFolder = temporaryFolder.newFolder("shared"); + Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath()); + + FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem(); + int fileStateSizeThreshold = 1024; + FsCheckpointStreamFactory checkpointStreamFactory = + new FsCheckpointStreamFactory(fileSystem, checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold); + + String localFolder = "local"; + temporaryFolder.newFolder(localFolder); + + int sstFileCount = 6; + Map sstFilePaths = new HashMap<>(sstFileCount); + generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold, sstFilePaths); + + int miscFileCount = 3; + Map miscFilePaths = new HashMap<>(miscFileCount); + ThreadLocalRandom random = ThreadLocalRandom.current(); + File currentFile = temporaryFolder.newFile(String.format("%s/CURRENT", localFolder)); + generateRandomFileContent(currentFile.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("CURRENT"), Path.fromLocalFile(currentFile)); + + File manifest = temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder)); + generateRandomFileContent(manifest.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("MANIFEST"), Path.fromLocalFile(manifest)); + + File options = temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder)); +
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244333053 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadStateFiles( + CheckpointStreamFactory checkpointStreamFactory, + SnapshotDirectory localBackupDirectory, + Set baseSstFiles, + int uploadingThreadNum, + CloseableRegistry snapshotCloseableRegistry, + @Nonnull ConcurrentHashMap sstFiles, + @Nonnull ConcurrentHashMap miscFiles) throws Exception { + + Preconditions.checkState(localBackupDirectory.exists()); + + FileStatus[] fileStatuses = localBackupDirectory.listStatus(); + if (fileStatuses != null) { + ExecutorService executorService = createExecutorService(uploadingThreadNum); + + try { + List runnables = createUploadRunnables( Review comment: @azagrebin thank you for your reply, I'll update the code according to your suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244318378 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadStateFiles( + CheckpointStreamFactory checkpointStreamFactory, + SnapshotDirectory localBackupDirectory, + Set baseSstFiles, + int uploadingThreadNum, + CloseableRegistry snapshotCloseableRegistry, + @Nonnull ConcurrentHashMap sstFiles, + @Nonnull ConcurrentHashMap miscFiles) throws Exception { + + Preconditions.checkState(localBackupDirectory.exists()); + + FileStatus[] fileStatuses = localBackupDirectory.listStatus(); + if (fileStatuses != null) { + ExecutorService executorService = createExecutorService(uploadingThreadNum); + + try { + List runnables = createUploadRunnables( Review comment: Hi @azagrebin, when I want to implement this by using `Callable`, I can not find `java.util.concurrent.CompletableFuture.callAsync` function(JDK 8), after searching in flink project, I just find `org.apache.flink.runtime.rpc.MainThreadExecutable#callAsync`, Did I miss something? In other words, If I use `Callable` instead of `Runnable`, should I implement a `ThrowingCallable` just as `ThrowingRunnable`. If I use `Runnable` to implement this, how about create a `CompletableFuture` instance and pass to the called function `uploadLocalFileToCheckpointFs(Path, CompletableFuture)`, in `uploadLocalFileToCheckpointFs` we complete the passed in `CompletableFuture`, such as below ``` Map> futures = new HashMap(); // futures will update in createUploadRunnables List runnables = createUploadRunnables(sstFilePaths, miscFilePaths, futures); for (Runnable runnable : runnables) { CompletableFuture.runAsync(runnable, executorService); } FutureUtils.waitForAll(futures.values()).get(); private static StreamStateHandle uploadLocalFileToCheckpointFs(Path path, CompletableFuture future) { try { ... future.complete(...) } finally { } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r243766116 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -20,31 +20,44 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; +import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX; import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService; /** * Data transfer utils for {@link RocksDBKeyedStateBackend}. */ -class RocksDbStateDataTransfer { +public class RocksDbStateDataTransfer { + private static final int READ_BUFFER_SIZE = 16 * 1024; static void transferAllStateDataToDirectory( Review comment: hi @gyfora ,thank you for the review. For consistency do you mean the method and the class have the same access modifiers? and could you please tell me where will the method be integrated in. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services