[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250625564 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java ## @@ -36,43 +36,47 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService; /** - * Data transfer utils for {@link RocksDBKeyedStateBackend}. + * Help class for downloading RocksDBState. */ -class RocksDbStateDataTransfer { +public class RocksDBStateDownloader extends RocksDBStateDataTransfer { + public RocksDBStateDownloader(int restoringThreadNum) { + super(restoringThreadNum); + } - static void transferAllStateDataToDirectory( + /** +* Transfer all state data to the target directory using specified number of threads. +* +* @param restoreStateHandle Handles used to retrieve the state data. +* @param dest The target directory which the state data will be stored. +* @param closeableRegistry Which all the inputStream/outputStream will be registered and unregistered. +* +* @throws Exception Thrown if can not transfer all the state data. +*/ + public void transferAllStateDataToDirectory( IncrementalKeyedStateHandle restoreStateHandle, Path dest, - int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception { final Map sstFiles = restoreStateHandle.getSharedState(); final Map miscFiles = restoreStateHandle.getPrivateState(); - downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); - downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry); Review comment: ok, I see the problem, thanks for pointing out This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250607382 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.CheckedSupplier; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/** + * Help class for uploading RocksDB state files. + */ +public class RocksDBStateUploader extends RocksDBStateDataTransfer { + private final int readBufferSize = 16 * 1024; Review comment: can be static This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250325106 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.CheckedSupplier; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/** + * Help class for uploading RocksDB state. + */ +public class RocksDBStateUploader extends RocksDBStateDownloader { Review comment: `extends RocksDBStateDataTransfer` not `RocksDBStateDownloader`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250317750 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java ## @@ -97,6 +97,10 @@ protected void cancel() { } } + protected CloseableRegistry getSnapshotCloseableRegistry() { + return snapshotCloseableRegistry; Review comment: could we just make `snapshotCloseableRegistry` protected and use it directly in `RocksIncrementalSnapshotStrategy`? I think we do not need `(un)registerCloseableForCancellation` methods either afterwards and replace calls to them with direct calls to `snapshotCloseableRegistry.(un)registerCloseable`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250323327 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -701,17 +702,22 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() { } /** -* Gets the thread number will used for downloading files from DFS when restore. +* Gets the number of threads used to transfer files while snapshotting/restoring. */ - public int getNumberOfRestoringThreads() { - return numberOfRestoringThreads == UNDEFINED_NUMBER_OF_RESTORING_THREADS ? - RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : numberOfRestoringThreads; + public int getNumberOfTransferingThreads() { + return numberOfTransferingThreads == UNDEFINED_NUMBER_OF_TRANSFERING_THREADS ? + CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() : numberOfTransferingThreads; } - public void setNumberOfRestoringThreads(int numberOfRestoringThreads) { - Preconditions.checkArgument(numberOfRestoringThreads > 0, - "The number of threads used to download files from DFS in RocksDBStateBackend should > 0."); - this.numberOfRestoringThreads = numberOfRestoringThreads; + /** +* Sets the number of threads used to transfer files while snapshotting/restoring. +* +* @param numberOfTransferingThreads The number of threads used to download files from DFS while restoring. Review comment: `used to download files from DFS while restoring` -> `to transfer files while snapshotting/restoring` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250322353 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java ## @@ -47,10 +47,10 @@ HEAP.name(), ROCKSDB.name())); /** -* The number of threads used to download files from DFS in RocksDBStateBackend. +* The number of threads used to transfer(download and upload) files in RocksDBStateBackend. Review comment: space in `transfer(download` before `(`, also below in similar places This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250324277 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java ## @@ -36,43 +36,47 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService; /** - * Data transfer utils for {@link RocksDBKeyedStateBackend}. + * Help class for downloading RocksDBState. */ -class RocksDbStateDataTransfer { +public class RocksDBStateDownloader extends RocksDBStateDataTransfer { + public RocksDBStateDownloader(int restoringThreadNum) { + super(restoringThreadNum); + } - static void transferAllStateDataToDirectory( + /** +* Transfer all state data to the target directory using specified number of threads. +* +* @param restoreStateHandle Handles used to retrieve the state data. +* @param dest The target directory which the state data will be stored. +* @param closeableRegistry Which all the inputStream/outputStream will be registered and unregistered. +* +* @throws Exception Thrown if can not transfer all the state data. +*/ + public void transferAllStateDataToDirectory( IncrementalKeyedStateHandle restoreStateHandle, Path dest, - int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception { final Map sstFiles = restoreStateHandle.getSharedState(); final Map miscFiles = restoreStateHandle.getPrivateState(); - downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); - downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry); Review comment: could `closeableRegistry` be shared the same way as `executorService` in `RocksDBStateDataTransfer`? Then we do not need to pass to all class methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250319960 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -517,6 +516,10 @@ public void restore(Collection restoreState) throws Exception LOG.info("Initializing RocksDB keyed state backend."); + if (LOG.isDebugEnabled()) { Review comment: this log seems to be a duplicate of the following one in try/if/else. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r248348274 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +80,116 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + /** +* Upload all the files to checkpoint fileSystem using specified number of threads. +* +* @param files The files will be uploaded to checkpoint filesystem. +* @param numberOfSnapshottingThreads The number of threads used to upload the files. +* @param checkpointStreamFactory The checkpoint streamFactory used to create outputstream. +* @param closeableRegistry +* +* @throws Exception Thrown if can not upload all the files. +*/ + public static Map uploadFilesToCheckpointFs( + @Nonnull Map files, + int numberOfSnapshottingThreads, + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry closeableRegistry) throws Exception { + + Map handles = new HashMap<>(); + + ExecutorService executorService = createExecutorService(numberOfSnapshottingThreads); Review comment: @klion26 The upload and download part are quite independent. The base class could contain the executor, number of threads, closable registry and close method (to shutdown the executor instead of shutdowning it every time). The upload and download classes can extend it and share the executor. The downloader could be then registered with the closable registry. For upload, we can of course shutdown it immediately after restore is done. I agree that it would make sense to keep threads up and for code reuse as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246444142 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +149,133 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException); + CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream; + + File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); + generateRandomFileContent(file.getPath(), 20); + + Map filePaths = new HashMap<>(1); + filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath())); + try { + RocksDbStateDataTransfer.uploadFilesToCheckpointFs( + filePaths, + 5, + checkpointStreamFactory, + new CloseableRegistry()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e); + } + } + + /** +* Test that upload files with multi-thread correctly. +*/ + @Test + public void testMultiThreadUploadCorrectly() throws Exception { + + File checkpointPrivateFolder = temporaryFolder.newFolder("private"); + Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath()); + + File checkpointSharedFolder = temporaryFolder.newFolder("shared"); + Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath()); + + FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem(); + int fileStateSizeThreshold = 1024; + FsCheckpointStreamFactory checkpointStreamFactory = + new FsCheckpointStreamFactory(fileSystem, checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold); + + String localFolder = "local"; + temporaryFolder.newFolder(localFolder); + + int sstFileCount = 6; + Map sstFilePaths = generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold); + + Map sstFiles = new HashMap<>(sstFileCount); + + sstFiles.putAll(RocksDbStateDataTransfer.uploadFilesToCheckpointFs( Review comment: I think it can be directly assigned to `Map sstFiles = uploadFilesToCheckpointFs(..)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246443750 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java ## @@ -410,74 +412,51 @@ private void uploadSstFiles( // write state data Preconditions.checkState(localBackupDirectory.exists()); + Map sstFilePaths = new HashMap<>(); + Map miscFilePaths = new HashMap<>(); + FileStatus[] fileStatuses = localBackupDirectory.listStatus(); if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - final Path filePath = fileStatus.getPath(); - final String fileName = filePath.getName(); - final StateHandleID stateHandleID = new StateHandleID(fileName); - - if (fileName.endsWith(SST_FILE_SUFFIX)) { - final boolean existsAlready = - baseSstFiles != null && baseSstFiles.contains(stateHandleID); - - if (existsAlready) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - sstFiles.put( - stateHandleID, - new PlaceholderStreamStateHandle()); - } else { - sstFiles.put(stateHandleID, uploadLocalFileToCheckpointFs(filePath)); - } - } else { - StreamStateHandle fileHandle = uploadLocalFileToCheckpointFs(filePath); - miscFiles.put(stateHandleID, fileHandle); - } - } + createUploadFilePaths(fileStatuses, sstFiles, sstFilePaths, miscFilePaths); + + sstFiles.putAll(uploadFilesToCheckpointFs( + sstFilePaths, + numberOfRestoringThreads, + checkpointStreamFactory, + getSnapshotCloseableRegistry())); + miscFiles.putAll(uploadFilesToCheckpointFs( + miscFilePaths, + numberOfRestoringThreads, + checkpointStreamFactory, + getSnapshotCloseableRegistry())); } } - private StreamStateHandle uploadLocalFileToCheckpointFs(Path filePath) throws Exception { - FSDataInputStream inputStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; - - try { - final byte[] buffer = new byte[READ_BUFFER_SIZE]; - - FileSystem backupFileSystem = localBackupDirectory.getFileSystem(); - inputStream = backupFileSystem.open(filePath); - registerCloseableForCancellation(inputStream); - - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); - registerCloseableForCancellation(outputStream); - - while (true) { - int numBytes = inputStream.read(buffer); - - if (numBytes == -1) { - break; + private void createUploadFilePaths( + FileStatus[] fileStatuses, + Map sstFiles, + Map sstFilePaths, + Map miscFilePaths) { + for (FileStatus fileStatus : fileStatuses) { + final Path filePath = fileStatus.getPath(); +
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246443897 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +149,133 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException); + CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream; + + File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); + generateRandomFileContent(file.getPath(), 20); + + Map filePaths = new HashMap<>(1); + filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath())); + try { + RocksDbStateDataTransfer.uploadFilesToCheckpointFs( + filePaths, + 5, + checkpointStreamFactory, + new CloseableRegistry()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e); + } + } + + /** +* Test that upload files with multi-thread correctly. +*/ + @Test + public void testMultiThreadUploadCorrectly() throws Exception { + Review comment: could you remove this new line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246358514 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() { + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() { + return new ByteStreamStateHandle("testHandle", "testHandle".getBytes()); + } + + @Override + public void close() { + } + + @Override + public long getPos() { + return 0; + } + + @Override + public void flush() { + } + + @Override + public void sync() { + } + + @Override + public void write(int b) throws IOException { + throw expectedException; + } + }; + CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream; + + File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); + generateRandomFileContent(file.getPath(), 20); + + Map filePaths = new HashMap<>(1); + filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath())); + try { + RocksDbStateDataTransfer.uploadFilesToCheckpointFs( + filePaths, + 5, + checkpointStreamFactory, + new CloseableRegistry(), + new HashMap<>()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e); + } + } + + /** +* Test that upload files with multi-thread correctly. +*/ + @Test + public void testMultiThreadUploadCorrectly() throws Exception { + + File checkpointPrivateFolder = temporaryFolder.newFolder("private"); + Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath()); + + File checkpointSharedFolder = temporaryFolder.newFolder("shared"); + Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath()); + + FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem(); + int fileStateSizeThreshold = 1024; + FsCheckpointStreamFactory checkpointStreamFactory = + new FsCheckpointStreamFactory(fileSystem, checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold); + + String localFolder = "local"; + temporaryFolder.newFolder(localFolder); + + int sstFileCount = 6; + Map sstFilePaths = new HashMap<>(sstFileCount); + generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold, sstFilePaths); + + int miscFileCount = 3; + Map miscFilePaths = new HashMap<>(miscFileCount); + ThreadLocalRandom random = ThreadLocalRandom.current(); + File currentFile = temporaryFolder.newFile(String.format("%s/CURRENT", localFolder)); + generateRandomFileContent(currentFile.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("CURRENT"), Path.fromLocalFile(currentFile)); + + File manifest = temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder)); + generateRandomFileContent(manifest.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("MANIFEST"), Path.fromLocalFile(manifest)); + + File options = temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder)); +
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246337062 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() { Review comment: Maybe, move creation code for `outputStream` into a separate `createFailingCheckpointStateOutputStream(failureException)` function? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246341548 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() { + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() { + return new ByteStreamStateHandle("testHandle", "testHandle".getBytes()); + } + + @Override + public void close() { + } + + @Override + public long getPos() { + return 0; + } + + @Override + public void flush() { + } + + @Override + public void sync() { + } + + @Override + public void write(int b) throws IOException { + throw expectedException; + } + }; + CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream; + + File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); + generateRandomFileContent(file.getPath(), 20); + + Map filePaths = new HashMap<>(1); + filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath())); + try { + RocksDbStateDataTransfer.uploadFilesToCheckpointFs( + filePaths, + 5, + checkpointStreamFactory, + new CloseableRegistry(), + new HashMap<>()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e); + } + } + + /** +* Test that upload files with multi-thread correctly. +*/ + @Test + public void testMultiThreadUploadCorrectly() throws Exception { + + File checkpointPrivateFolder = temporaryFolder.newFolder("private"); + Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath()); + + File checkpointSharedFolder = temporaryFolder.newFolder("shared"); + Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath()); + + FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem(); + int fileStateSizeThreshold = 1024; + FsCheckpointStreamFactory checkpointStreamFactory = + new FsCheckpointStreamFactory(fileSystem, checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold); + + String localFolder = "local"; + temporaryFolder.newFolder(localFolder); + + int sstFileCount = 6; + Map sstFilePaths = new HashMap<>(sstFileCount); + generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold, sstFilePaths); + + int miscFileCount = 3; + Map miscFilePaths = new HashMap<>(miscFileCount); + ThreadLocalRandom random = ThreadLocalRandom.current(); + File currentFile = temporaryFolder.newFile(String.format("%s/CURRENT", localFolder)); + generateRandomFileContent(currentFile.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("CURRENT"), Path.fromLocalFile(currentFile)); + + File manifest = temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder)); + generateRandomFileContent(manifest.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("MANIFEST"), Path.fromLocalFile(manifest)); + + File options = temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder)); +
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r24695 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadFilesToCheckpointFs( + @Nonnull Map files, + int numberOfSnapshottingThreads, + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry closeableRegistry, + Map hanldes) throws Exception { + + ExecutorService executorService = createExecutorService(numberOfSnapshottingThreads); + + Map> futures = new HashMap<>(files.size()); + + for (Map.Entry entry : files.entrySet()) { Review comment: What do you think if the first part, which creates futures, has its own function `createUploadFutures`? Also if `() -> uploadLocalFileToCheckpointFs(..)` is assigned to a separate variable to reduce `futures.put(..)` line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246334980 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java ## @@ -428,57 +433,25 @@ private void uploadSstFiles( stateHandleID, new PlaceholderStreamStateHandle()); } else { - sstFiles.put(stateHandleID, uploadLocalFileToCheckpointFs(filePath)); + sstFilePaths.put(stateHandleID, filePath); Review comment: Same for this loop which creates `sstFilePaths/miscFilePaths`, it could be in another function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246336347 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } } + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException { + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() { + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() { + return new ByteStreamStateHandle("testHandle", "testHandle".getBytes()); + } + + @Override + public void close() { + } + + @Override + public long getPos() { + return 0; + } + + @Override + public void flush() { + } + + @Override + public void sync() { + } + + @Override + public void write(int b) throws IOException { + throw expectedException; + } + }; + CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream; + + File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); + generateRandomFileContent(file.getPath(), 20); + + Map filePaths = new HashMap<>(1); + filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath())); + try { + RocksDbStateDataTransfer.uploadFilesToCheckpointFs( + filePaths, + 5, + checkpointStreamFactory, + new CloseableRegistry(), + new HashMap<>()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e); + } + } + + /** +* Test that upload files with multi-thread correctly. +*/ + @Test + public void testMultiThreadUploadCorrectly() throws Exception { + + File checkpointPrivateFolder = temporaryFolder.newFolder("private"); + Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath()); + + File checkpointSharedFolder = temporaryFolder.newFolder("shared"); + Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath()); + + FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem(); + int fileStateSizeThreshold = 1024; + FsCheckpointStreamFactory checkpointStreamFactory = + new FsCheckpointStreamFactory(fileSystem, checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold); + + String localFolder = "local"; + temporaryFolder.newFolder(localFolder); + + int sstFileCount = 6; + Map sstFilePaths = new HashMap<>(sstFileCount); + generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold, sstFilePaths); + + int miscFileCount = 3; + Map miscFilePaths = new HashMap<>(miscFileCount); + ThreadLocalRandom random = ThreadLocalRandom.current(); + File currentFile = temporaryFolder.newFile(String.format("%s/CURRENT", localFolder)); + generateRandomFileContent(currentFile.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("CURRENT"), Path.fromLocalFile(currentFile)); + + File manifest = temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder)); + generateRandomFileContent(manifest.getPath(), random.nextInt(fileStateSizeThreshold) + 1); + miscFilePaths.put(new StateHandleID("MANIFEST"), Path.fromLocalFile(manifest)); + + File options = temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder)); +
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r246340345 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadFilesToCheckpointFs( + @Nonnull Map files, + int numberOfSnapshottingThreads, + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry closeableRegistry, + Map hanldes) throws Exception { Review comment: Not sure here, in general I think it is not obvious that function implicitly returns something (here `hanldes`, typo btw, should be `handles`) in its arguments. At least I would mention this fact in the function doc comment. `transferAllStateDataToDirectory` is now public and should also have a doc comment. On the other hand, if function returns explicitly `handles`, we have to reiterate them later where the result is used to add it to the final result map (e.g. `sstFiles.putAll(uploadFilesToCheckpointFs(..))`). Though, I would not expect the size of map to be performance critical for reiteration. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244325175 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadStateFiles( + CheckpointStreamFactory checkpointStreamFactory, + SnapshotDirectory localBackupDirectory, + Set baseSstFiles, + int uploadingThreadNum, + CloseableRegistry snapshotCloseableRegistry, + @Nonnull ConcurrentHashMap sstFiles, + @Nonnull ConcurrentHashMap miscFiles) throws Exception { + + Preconditions.checkState(localBackupDirectory.exists()); + + FileStatus[] fileStatuses = localBackupDirectory.listStatus(); + if (fileStatuses != null) { + ExecutorService executorService = createExecutorService(uploadingThreadNum); + + try { + List runnables = createUploadRunnables( Review comment: Hi @klion26 , I think this could work: ``` futures.put(stateHandleID, CompletableFuture.supplyAsync(CheckedSupplier.unchecked( () -> uploadLocalFileToCheckpointFs(path)), executorService)) ``` `CompletableFuture.supplyAsync` seems to do already internally what you suggest. Also I would run parallel upload separately for `sstFilePaths` and `miscFilePaths`, one after another, so that `RocksDbStateDataTransfer` does not need to know about file structure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244325175 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadStateFiles( + CheckpointStreamFactory checkpointStreamFactory, + SnapshotDirectory localBackupDirectory, + Set baseSstFiles, + int uploadingThreadNum, + CloseableRegistry snapshotCloseableRegistry, + @Nonnull ConcurrentHashMap sstFiles, + @Nonnull ConcurrentHashMap miscFiles) throws Exception { + + Preconditions.checkState(localBackupDirectory.exists()); + + FileStatus[] fileStatuses = localBackupDirectory.listStatus(); + if (fileStatuses != null) { + ExecutorService executorService = createExecutorService(uploadingThreadNum); + + try { + List runnables = createUploadRunnables( Review comment: Hi @klion26 , I think this could work: ``` futures .put(stateHandleID , CompletableFuture.supplyAsync(CheckedSupplier.unchecked( () -> uploadLocalFileToCheckpointFs(path)), executorService)) ``` `CompletableFuture.supplyAsync` seems to do already internally what you suggest. Also I would run parallel upload separately for `sstFilePaths` and `miscFilePaths`, one after another, so that `RocksDbStateDataTransfer` does not need to know about file structure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244004203 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -714,6 +729,20 @@ public void setNumberOfRestoringThreads(int numberOfRestoringThreads) { this.numberOfRestoringThreads = numberOfRestoringThreads; } + /** +* Gets the thread number will used for uploading files to DFS when snapshot. +*/ + public int getNumberOfUploadingThreads() { + return numberOfUploadingThreads == UNDEFINED_NUMBER_OF_UPLOADING_THREADS ? + CHECKPOINT_SNAPSHOT_THREAD_NUM.defaultValue() : numberOfUploadingThreads; + } + + public void setNumberOfUploadingThreads(int numberOfUploadingThreads) { + Preconditions.checkArgument(numberOfUploadingThreads > 0, + "The number of threads used to upload files to DFS in RocksDBStateBackend should > 0."); Review comment: `should be greater than zero` (also in `setNumberOfRestoringThreads`) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244010700 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + public static void uploadStateFiles( + CheckpointStreamFactory checkpointStreamFactory, + SnapshotDirectory localBackupDirectory, + Set baseSstFiles, + int uploadingThreadNum, + CloseableRegistry snapshotCloseableRegistry, + @Nonnull ConcurrentHashMap sstFiles, + @Nonnull ConcurrentHashMap miscFiles) throws Exception { + + Preconditions.checkState(localBackupDirectory.exists()); + + FileStatus[] fileStatuses = localBackupDirectory.listStatus(); + if (fileStatuses != null) { + ExecutorService executorService = createExecutorService(uploadingThreadNum); + + try { + List runnables = createUploadRunnables( Review comment: I think using `Callable` (path -> StreamStateHandle, as it was in `uploadLocalFileToCheckpointFs`) is simpler and stateless, comparing to using `Runnable` and implicit adding results into the concurrent map. What if we implement `createUploadCallables` and use `CompletableFuture.callAsync` to create `Map>`? Then the futures (map.values()) could be waited for and the map could be converted into the result `Map`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244004144 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -714,6 +729,20 @@ public void setNumberOfRestoringThreads(int numberOfRestoringThreads) { this.numberOfRestoringThreads = numberOfRestoringThreads; } + /** +* Gets the thread number will used for uploading files to DFS when snapshot. Review comment: `The number of threads used to upload files to DFS while snapshotting` Can you please change the comment the same way for `getNumberOfRestoringThreads` and add similar comments for the corresponding setter methods? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244002977 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -220,6 +220,9 @@ /** Thread number used to download from DFS when restore. */ private final int restoringThreadNum; + /** Thread number used to upload to DFS when snapshot. */ + private final int uploadingThreadNum; Review comment: I would align it with `restoringThreadNum` and name it `snapshottingThreadNum`. I think it would be even better to name them both `numberOfRestoringThreads` and `numberOfSnapshottingThreads`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r244009608 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java ## @@ -403,85 +409,6 @@ private void cleanupIncompleteSnapshot(@Nonnull List statesToDiscar } } - private void uploadSstFiles( - @Nonnull Map sstFiles, - @Nonnull Map miscFiles) throws Exception { - - // write state data - Preconditions.checkState(localBackupDirectory.exists()); - - FileStatus[] fileStatuses = localBackupDirectory.listStatus(); - if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - final Path filePath = fileStatus.getPath(); - final String fileName = filePath.getName(); - final StateHandleID stateHandleID = new StateHandleID(fileName); - - if (fileName.endsWith(SST_FILE_SUFFIX)) { - final boolean existsAlready = - baseSstFiles != null && baseSstFiles.contains(stateHandleID); - - if (existsAlready) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - sstFiles.put( - stateHandleID, - new PlaceholderStreamStateHandle()); - } else { - sstFiles.put(stateHandleID, uploadLocalFileToCheckpointFs(filePath)); Review comment: I think this logic of sst/placeholders/misc file separation belongs here. `RocksDbStateDataTransfer` should be responsible only for the upload. What if instead of calling `uploadLocalFileToCheckpointFs`, we fill `sstFilesToUpload` and `miscFilesToUpload` of `Map` in this loop and then call `RocksDbStateDataTransfer.uploadStateFiles` for each map? This `uploadStateFiles` could return `Map` which could be put into the final `sstFiles`/`miscFiles`. I think `ConcurrentHashMap` is not needed then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services