[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-24 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250625564
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 ##
 @@ -36,43 +36,47 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
  */
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+   public RocksDBStateDownloader(int restoringThreadNum) {
+   super(restoringThreadNum);
+   }
 
-   static void transferAllStateDataToDirectory(
+   /**
+* Transfer all state data to the target directory using specified 
number of threads.
+*
+* @param restoreStateHandle Handles used to retrieve the state data.
+* @param dest The target directory which the state data will be stored.
+* @param closeableRegistry Which all the inputStream/outputStream will 
be registered and unregistered.
+*
+* @throws Exception Thrown if can not transfer all the state data.
+*/
+   public void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest,
-   int restoringThreadNum,
CloseableRegistry closeableRegistry) throws Exception {
 
final Map sstFiles =
restoreStateHandle.getSharedState();
final Map miscFiles =
restoreStateHandle.getPrivateState();
 
-   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
-   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(sstFiles, dest, 
closeableRegistry);
 
 Review comment:
   ok, I see the problem, thanks for pointing out


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-24 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250607382
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/**
+ * Help class for uploading RocksDB state files.
+ */
+public class RocksDBStateUploader extends RocksDBStateDataTransfer {
+   private final int readBufferSize = 16 * 1024;
 
 Review comment:
   can be static


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250325106
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/**
+ * Help class for uploading RocksDB state.
+ */
+public class RocksDBStateUploader extends RocksDBStateDownloader {
 
 Review comment:
   `extends RocksDBStateDataTransfer` not `RocksDBStateDownloader`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250317750
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java
 ##
 @@ -97,6 +97,10 @@ protected void cancel() {
}
}
 
+   protected CloseableRegistry getSnapshotCloseableRegistry() {
+   return snapshotCloseableRegistry;
 
 Review comment:
   could we just make `snapshotCloseableRegistry` protected and use it directly 
in `RocksIncrementalSnapshotStrategy`? I think we do not need 
`(un)registerCloseableForCancellation` methods either afterwards and replace 
calls to them with direct calls to 
`snapshotCloseableRegistry.(un)registerCloseable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250323327
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -701,17 +702,22 @@ public RocksDBNativeMetricOptions 
getMemoryWatcherOptions() {
}
 
/**
-* Gets the thread number will used for downloading files from DFS when 
restore.
+* Gets the number of threads used to transfer files while 
snapshotting/restoring.
 */
-   public int getNumberOfRestoringThreads() {
-   return numberOfRestoringThreads == 
UNDEFINED_NUMBER_OF_RESTORING_THREADS ?
-   
RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : 
numberOfRestoringThreads;
+   public int getNumberOfTransferingThreads() {
+   return numberOfTransferingThreads == 
UNDEFINED_NUMBER_OF_TRANSFERING_THREADS ?
+   CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() : 
numberOfTransferingThreads;
}
 
-   public void setNumberOfRestoringThreads(int numberOfRestoringThreads) {
-   Preconditions.checkArgument(numberOfRestoringThreads > 0,
-   "The number of threads used to download files from DFS 
in RocksDBStateBackend should > 0.");
-   this.numberOfRestoringThreads = numberOfRestoringThreads;
+   /**
+* Sets the number of threads used to transfer files while 
snapshotting/restoring.
+*
+* @param numberOfTransferingThreads The number of threads used to 
download files from DFS while restoring.
 
 Review comment:
   `used to download files from DFS while restoring` -> `to transfer files 
while snapshotting/restoring`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250322353
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 ##
 @@ -47,10 +47,10 @@
HEAP.name(), ROCKSDB.name()));
 
/**
-* The number of threads used to download files from DFS in 
RocksDBStateBackend.
+* The number of threads used to transfer(download and upload) files in 
RocksDBStateBackend.
 
 Review comment:
   space in `transfer(download` before `(`, also below in similar places


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250324277
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 ##
 @@ -36,43 +36,47 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
  */
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+   public RocksDBStateDownloader(int restoringThreadNum) {
+   super(restoringThreadNum);
+   }
 
-   static void transferAllStateDataToDirectory(
+   /**
+* Transfer all state data to the target directory using specified 
number of threads.
+*
+* @param restoreStateHandle Handles used to retrieve the state data.
+* @param dest The target directory which the state data will be stored.
+* @param closeableRegistry Which all the inputStream/outputStream will 
be registered and unregistered.
+*
+* @throws Exception Thrown if can not transfer all the state data.
+*/
+   public void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest,
-   int restoringThreadNum,
CloseableRegistry closeableRegistry) throws Exception {
 
final Map sstFiles =
restoreStateHandle.getSharedState();
final Map miscFiles =
restoreStateHandle.getPrivateState();
 
-   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
-   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(sstFiles, dest, 
closeableRegistry);
 
 Review comment:
   could `closeableRegistry` be shared the same way as `executorService` in 
`RocksDBStateDataTransfer`? Then we do not need to pass to all class methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250319960
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -517,6 +516,10 @@ public void restore(Collection 
restoreState) throws Exception
 
LOG.info("Initializing RocksDB keyed state backend.");
 
+   if (LOG.isDebugEnabled()) {
 
 Review comment:
   this log seems to be a duplicate of the following one in try/if/else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-16 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r248348274
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +80,116 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   /**
+* Upload all the files to checkpoint fileSystem using specified number 
of threads.
+*
+* @param files The files will be uploaded to checkpoint filesystem.
+* @param numberOfSnapshottingThreads The number of threads used to 
upload the files.
+* @param checkpointStreamFactory The checkpoint streamFactory used to 
create outputstream.
+* @param closeableRegistry
+*
+* @throws Exception Thrown if can not upload all the files.
+*/
+   public static Map 
uploadFilesToCheckpointFs(
+   @Nonnull Map files,
+   int numberOfSnapshottingThreads,
+   CheckpointStreamFactory checkpointStreamFactory,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   Map handles = new HashMap<>();
+
+   ExecutorService executorService = 
createExecutorService(numberOfSnapshottingThreads);
 
 Review comment:
   @klion26 
   The upload and download part are quite independent. The base class could 
contain the executor, number of threads, closable registry and close method (to 
shutdown the executor instead of shutdowning it every time). The  upload and 
download classes can extend it and share the executor. The downloader could be 
then registered with the closable registry. For upload, we can of course 
shutdown it immediately after restore is done. I agree that it would make sense 
to keep threads up and for code reuse as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246444142
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +149,133 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStateOutputStream outputStream = 
createFailingCheckpointStateOutputStream(expectedException);
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
+   File checkpointPrivateFolder = 
temporaryFolder.newFolder("private");
+   Path checkpointPrivateDirectory = new 
Path(checkpointPrivateFolder.getPath());
+
+   File checkpointSharedFolder = 
temporaryFolder.newFolder("shared");
+   Path checkpointSharedDirectory = new 
Path(checkpointSharedFolder.getPath());
+
+   FileSystem fileSystem = 
checkpointPrivateDirectory.getFileSystem();
+   int fileStateSizeThreshold = 1024;
+   FsCheckpointStreamFactory checkpointStreamFactory =
+   new FsCheckpointStreamFactory(fileSystem, 
checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold);
+
+   String localFolder = "local";
+   temporaryFolder.newFolder(localFolder);
+
+   int sstFileCount = 6;
+   Map sstFilePaths = 
generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold);
+
+   Map sstFiles = new 
HashMap<>(sstFileCount);
+
+   
sstFiles.putAll(RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
 
 Review comment:
   I think it can be directly assigned to 
   `Map sstFiles = 
uploadFilesToCheckpointFs(..)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246443750
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 ##
 @@ -410,74 +412,51 @@ private void uploadSstFiles(
// write state data
Preconditions.checkState(localBackupDirectory.exists());
 
+   Map sstFilePaths = new HashMap<>();
+   Map miscFilePaths = new 
HashMap<>();
+
FileStatus[] fileStatuses = 
localBackupDirectory.listStatus();
if (fileStatuses != null) {
-   for (FileStatus fileStatus : fileStatuses) {
-   final Path filePath = 
fileStatus.getPath();
-   final String fileName = 
filePath.getName();
-   final StateHandleID stateHandleID = new 
StateHandleID(fileName);
-
-   if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
-   final boolean existsAlready =
-   baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
-
-   if (existsAlready) {
-   // we introduce a 
placeholder state handle, that is replaced with the
-   // original from the 
shared state registry (created from a previous checkpoint)
-   sstFiles.put(
-   stateHandleID,
-   new 
PlaceholderStreamStateHandle());
-   } else {
-   
sstFiles.put(stateHandleID, uploadLocalFileToCheckpointFs(filePath));
-   }
-   } else {
-   StreamStateHandle fileHandle = 
uploadLocalFileToCheckpointFs(filePath);
-   miscFiles.put(stateHandleID, 
fileHandle);
-   }
-   }
+   createUploadFilePaths(fileStatuses, sstFiles, 
sstFilePaths, miscFilePaths);
+
+   sstFiles.putAll(uploadFilesToCheckpointFs(
+   sstFilePaths,
+   numberOfRestoringThreads,
+   checkpointStreamFactory,
+   getSnapshotCloseableRegistry()));
+   miscFiles.putAll(uploadFilesToCheckpointFs(
+   miscFilePaths,
+   numberOfRestoringThreads,
+   checkpointStreamFactory,
+   getSnapshotCloseableRegistry()));
}
}
 
-   private StreamStateHandle uploadLocalFileToCheckpointFs(Path 
filePath) throws Exception {
-   FSDataInputStream inputStream = null;
-   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
-
-   try {
-   final byte[] buffer = new 
byte[READ_BUFFER_SIZE];
-
-   FileSystem backupFileSystem = 
localBackupDirectory.getFileSystem();
-   inputStream = backupFileSystem.open(filePath);
-   registerCloseableForCancellation(inputStream);
-
-   outputStream = checkpointStreamFactory
-   
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-   registerCloseableForCancellation(outputStream);
-
-   while (true) {
-   int numBytes = inputStream.read(buffer);
-
-   if (numBytes == -1) {
-   break;
+   private void createUploadFilePaths(
+   FileStatus[] fileStatuses,
+   Map sstFiles,
+   Map sstFilePaths,
+   Map miscFilePaths) {
+   for (FileStatus fileStatus : fileStatuses) {
+   final Path filePath = fileStatus.getPath();
+

[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246443897
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +149,133 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStateOutputStream outputStream = 
createFailingCheckpointStateOutputStream(expectedException);
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
 
 Review comment:
   could you remove this new line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246358514
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() {
+   @Nullable
+   @Override
+   public StreamStateHandle closeAndGetHandle() {
+   return new ByteStreamStateHandle("testHandle", 
"testHandle".getBytes());
+   }
+
+   @Override
+   public void close() {
+   }
+
+   @Override
+   public long getPos() {
+   return 0;
+   }
+
+   @Override
+   public void flush() {
+   }
+
+   @Override
+   public void sync() {
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   throw expectedException;
+   }
+   };
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry(),
+   new HashMap<>());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
+   File checkpointPrivateFolder = 
temporaryFolder.newFolder("private");
+   Path checkpointPrivateDirectory = new 
Path(checkpointPrivateFolder.getPath());
+
+   File checkpointSharedFolder = 
temporaryFolder.newFolder("shared");
+   Path checkpointSharedDirectory = new 
Path(checkpointSharedFolder.getPath());
+
+   FileSystem fileSystem = 
checkpointPrivateDirectory.getFileSystem();
+   int fileStateSizeThreshold = 1024;
+   FsCheckpointStreamFactory checkpointStreamFactory =
+   new FsCheckpointStreamFactory(fileSystem, 
checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold);
+
+   String localFolder = "local";
+   temporaryFolder.newFolder(localFolder);
+
+   int sstFileCount = 6;
+   Map sstFilePaths = new 
HashMap<>(sstFileCount);
+   generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold, sstFilePaths);
+
+   int miscFileCount = 3;
+   Map miscFilePaths = new 
HashMap<>(miscFileCount);
+   ThreadLocalRandom random = ThreadLocalRandom.current();
+   File currentFile = 
temporaryFolder.newFile(String.format("%s/CURRENT", localFolder));
+   generateRandomFileContent(currentFile.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("CURRENT"), 
Path.fromLocalFile(currentFile));
+
+   File manifest = 
temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder));
+   generateRandomFileContent(manifest.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("MANIFEST"), 
Path.fromLocalFile(manifest));
+
+   File options = 
temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder));
+   

[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246337062
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() {
 
 Review comment:
   Maybe, move creation code for `outputStream` into a separate 
`createFailingCheckpointStateOutputStream(failureException)` function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246341548
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() {
+   @Nullable
+   @Override
+   public StreamStateHandle closeAndGetHandle() {
+   return new ByteStreamStateHandle("testHandle", 
"testHandle".getBytes());
+   }
+
+   @Override
+   public void close() {
+   }
+
+   @Override
+   public long getPos() {
+   return 0;
+   }
+
+   @Override
+   public void flush() {
+   }
+
+   @Override
+   public void sync() {
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   throw expectedException;
+   }
+   };
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry(),
+   new HashMap<>());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
+   File checkpointPrivateFolder = 
temporaryFolder.newFolder("private");
+   Path checkpointPrivateDirectory = new 
Path(checkpointPrivateFolder.getPath());
+
+   File checkpointSharedFolder = 
temporaryFolder.newFolder("shared");
+   Path checkpointSharedDirectory = new 
Path(checkpointSharedFolder.getPath());
+
+   FileSystem fileSystem = 
checkpointPrivateDirectory.getFileSystem();
+   int fileStateSizeThreshold = 1024;
+   FsCheckpointStreamFactory checkpointStreamFactory =
+   new FsCheckpointStreamFactory(fileSystem, 
checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold);
+
+   String localFolder = "local";
+   temporaryFolder.newFolder(localFolder);
+
+   int sstFileCount = 6;
+   Map sstFilePaths = new 
HashMap<>(sstFileCount);
+   generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold, sstFilePaths);
+
+   int miscFileCount = 3;
+   Map miscFilePaths = new 
HashMap<>(miscFileCount);
+   ThreadLocalRandom random = ThreadLocalRandom.current();
+   File currentFile = 
temporaryFolder.newFile(String.format("%s/CURRENT", localFolder));
+   generateRandomFileContent(currentFile.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("CURRENT"), 
Path.fromLocalFile(currentFile));
+
+   File manifest = 
temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder));
+   generateRandomFileContent(manifest.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("MANIFEST"), 
Path.fromLocalFile(manifest));
+
+   File options = 
temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder));
+   

[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r24695
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadFilesToCheckpointFs(
+   @Nonnull Map files,
+   int numberOfSnapshottingThreads,
+   CheckpointStreamFactory checkpointStreamFactory,
+   CloseableRegistry closeableRegistry,
+   Map hanldes) throws Exception 
{
+
+   ExecutorService executorService = 
createExecutorService(numberOfSnapshottingThreads);
+
+   Map> 
futures = new HashMap<>(files.size());
+
+   for (Map.Entry entry : files.entrySet()) {
 
 Review comment:
   What do you think if the first part, which creates futures, has its own 
function `createUploadFutures`?
   Also if `() -> uploadLocalFileToCheckpointFs(..)` is assigned to a separate 
variable to reduce `futures.put(..)` line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246334980
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 ##
 @@ -428,57 +433,25 @@ private void uploadSstFiles(
stateHandleID,
new 
PlaceholderStreamStateHandle());
} else {
-   
sstFiles.put(stateHandleID, uploadLocalFileToCheckpointFs(filePath));
+   
sstFilePaths.put(stateHandleID, filePath);
 
 Review comment:
   Same for this loop which creates `sstFilePaths/miscFilePaths`, it could be 
in another function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246336347
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() {
+   @Nullable
+   @Override
+   public StreamStateHandle closeAndGetHandle() {
+   return new ByteStreamStateHandle("testHandle", 
"testHandle".getBytes());
+   }
+
+   @Override
+   public void close() {
+   }
+
+   @Override
+   public long getPos() {
+   return 0;
+   }
+
+   @Override
+   public void flush() {
+   }
+
+   @Override
+   public void sync() {
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   throw expectedException;
+   }
+   };
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry(),
+   new HashMap<>());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
+   File checkpointPrivateFolder = 
temporaryFolder.newFolder("private");
+   Path checkpointPrivateDirectory = new 
Path(checkpointPrivateFolder.getPath());
+
+   File checkpointSharedFolder = 
temporaryFolder.newFolder("shared");
+   Path checkpointSharedDirectory = new 
Path(checkpointSharedFolder.getPath());
+
+   FileSystem fileSystem = 
checkpointPrivateDirectory.getFileSystem();
+   int fileStateSizeThreshold = 1024;
+   FsCheckpointStreamFactory checkpointStreamFactory =
+   new FsCheckpointStreamFactory(fileSystem, 
checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold);
+
+   String localFolder = "local";
+   temporaryFolder.newFolder(localFolder);
+
+   int sstFileCount = 6;
+   Map sstFilePaths = new 
HashMap<>(sstFileCount);
+   generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold, sstFilePaths);
+
+   int miscFileCount = 3;
+   Map miscFilePaths = new 
HashMap<>(miscFileCount);
+   ThreadLocalRandom random = ThreadLocalRandom.current();
+   File currentFile = 
temporaryFolder.newFile(String.format("%s/CURRENT", localFolder));
+   generateRandomFileContent(currentFile.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("CURRENT"), 
Path.fromLocalFile(currentFile));
+
+   File manifest = 
temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder));
+   generateRandomFileContent(manifest.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("MANIFEST"), 
Path.fromLocalFile(manifest));
+
+   File options = 
temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder));
+   

[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246340345
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadFilesToCheckpointFs(
+   @Nonnull Map files,
+   int numberOfSnapshottingThreads,
+   CheckpointStreamFactory checkpointStreamFactory,
+   CloseableRegistry closeableRegistry,
+   Map hanldes) throws Exception 
{
 
 Review comment:
   Not sure here, in general I think it is not obvious that function implicitly 
returns something (here `hanldes`, typo btw, should be `handles`) in its 
arguments. At least I would mention this fact in the function doc comment. 
`transferAllStateDataToDirectory` is now public and should also have a doc 
comment.
   
   On the other hand, if function returns explicitly `handles`, we have to 
reiterate them later where the result is used to add it to the final result map 
(e.g. `sstFiles.putAll(uploadFilesToCheckpointFs(..))`). Though, I would not 
expect the size of map to be performance critical for reiteration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-28 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244325175
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadStateFiles(
+   CheckpointStreamFactory checkpointStreamFactory,
+   SnapshotDirectory localBackupDirectory,
+   Set baseSstFiles,
+   int uploadingThreadNum,
+   CloseableRegistry snapshotCloseableRegistry,
+   @Nonnull ConcurrentHashMap 
sstFiles,
+   @Nonnull ConcurrentHashMap 
miscFiles) throws Exception {
+
+   Preconditions.checkState(localBackupDirectory.exists());
+
+   FileStatus[] fileStatuses = localBackupDirectory.listStatus();
+   if (fileStatuses != null) {
+   ExecutorService executorService = 
createExecutorService(uploadingThreadNum);
+
+   try {
+   List runnables = 
createUploadRunnables(
 
 Review comment:
   Hi @klion26 , I think this could work:
   ```
   futures.put(stateHandleID, 
 CompletableFuture.supplyAsync(CheckedSupplier.unchecked(
   () -> uploadLocalFileToCheckpointFs(path)), executorService))
   ```
   `CompletableFuture.supplyAsync` seems to do already internally what you 
suggest.
   Also I would run parallel upload separately for  `sstFilePaths` and 
`miscFilePaths`, one after another, so that `RocksDbStateDataTransfer` does not 
need to know about file structure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-28 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244325175
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadStateFiles(
+   CheckpointStreamFactory checkpointStreamFactory,
+   SnapshotDirectory localBackupDirectory,
+   Set baseSstFiles,
+   int uploadingThreadNum,
+   CloseableRegistry snapshotCloseableRegistry,
+   @Nonnull ConcurrentHashMap 
sstFiles,
+   @Nonnull ConcurrentHashMap 
miscFiles) throws Exception {
+
+   Preconditions.checkState(localBackupDirectory.exists());
+
+   FileStatus[] fileStatuses = localBackupDirectory.listStatus();
+   if (fileStatuses != null) {
+   ExecutorService executorService = 
createExecutorService(uploadingThreadNum);
+
+   try {
+   List runnables = 
createUploadRunnables(
 
 Review comment:
   Hi @klion26 , I think this could work:
   ```
   futures .put(stateHandleID , 
 CompletableFuture.supplyAsync(CheckedSupplier.unchecked(
   () -> uploadLocalFileToCheckpointFs(path)), executorService))
   ```
   `CompletableFuture.supplyAsync` seems to do already internally what you 
suggest.
   Also I would run parallel upload separately for  `sstFilePaths` and 
`miscFilePaths`, one after another, so that `RocksDbStateDataTransfer` does not 
need to know about file structure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-26 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244004203
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -714,6 +729,20 @@ public void setNumberOfRestoringThreads(int 
numberOfRestoringThreads) {
this.numberOfRestoringThreads = numberOfRestoringThreads;
}
 
+   /**
+* Gets the thread number will used for uploading files to DFS when 
snapshot.
+*/
+   public int getNumberOfUploadingThreads() {
+   return numberOfUploadingThreads == 
UNDEFINED_NUMBER_OF_UPLOADING_THREADS ?
+   CHECKPOINT_SNAPSHOT_THREAD_NUM.defaultValue() : 
numberOfUploadingThreads;
+   }
+
+   public void setNumberOfUploadingThreads(int numberOfUploadingThreads) {
+   Preconditions.checkArgument(numberOfUploadingThreads > 0,
+   "The number of threads used to upload files to DFS in 
RocksDBStateBackend should > 0.");
 
 Review comment:
   `should be greater than zero` (also in `setNumberOfRestoringThreads`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-26 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244010700
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadStateFiles(
+   CheckpointStreamFactory checkpointStreamFactory,
+   SnapshotDirectory localBackupDirectory,
+   Set baseSstFiles,
+   int uploadingThreadNum,
+   CloseableRegistry snapshotCloseableRegistry,
+   @Nonnull ConcurrentHashMap 
sstFiles,
+   @Nonnull ConcurrentHashMap 
miscFiles) throws Exception {
+
+   Preconditions.checkState(localBackupDirectory.exists());
+
+   FileStatus[] fileStatuses = localBackupDirectory.listStatus();
+   if (fileStatuses != null) {
+   ExecutorService executorService = 
createExecutorService(uploadingThreadNum);
+
+   try {
+   List runnables = 
createUploadRunnables(
 
 Review comment:
   I think using `Callable` (path -> StreamStateHandle, as 
it was in `uploadLocalFileToCheckpointFs`) is simpler and stateless, comparing 
to using `Runnable` and implicit adding results into the concurrent map.
   
   What if we implement `createUploadCallables` and use 
`CompletableFuture.callAsync` to create `Map>`? Then the futures (map.values()) could 
be waited for and the map could be converted into the result 
`Map`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-26 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244004144
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -714,6 +729,20 @@ public void setNumberOfRestoringThreads(int 
numberOfRestoringThreads) {
this.numberOfRestoringThreads = numberOfRestoringThreads;
}
 
+   /**
+* Gets the thread number will used for uploading files to DFS when 
snapshot.
 
 Review comment:
   `The number of threads used to upload files to DFS while snapshotting`
   Can you please change the comment the same way for 
`getNumberOfRestoringThreads` and add similar comments for the corresponding 
setter methods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-26 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244002977
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -220,6 +220,9 @@
/** Thread number used to download from DFS when restore. */
private final int restoringThreadNum;
 
+   /** Thread number used to upload to DFS when snapshot. */
+   private final int uploadingThreadNum;
 
 Review comment:
   I would align it with `restoringThreadNum`  and name it 
`snapshottingThreadNum`. I think it would be even better to name them both 
`numberOfRestoringThreads` and `numberOfSnapshottingThreads`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-26 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244009608
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 ##
 @@ -403,85 +409,6 @@ private void cleanupIncompleteSnapshot(@Nonnull 
List statesToDiscar
}
}
 
-   private void uploadSstFiles(
-   @Nonnull Map sstFiles,
-   @Nonnull Map 
miscFiles) throws Exception {
-
-   // write state data
-   Preconditions.checkState(localBackupDirectory.exists());
-
-   FileStatus[] fileStatuses = 
localBackupDirectory.listStatus();
-   if (fileStatuses != null) {
-   for (FileStatus fileStatus : fileStatuses) {
-   final Path filePath = 
fileStatus.getPath();
-   final String fileName = 
filePath.getName();
-   final StateHandleID stateHandleID = new 
StateHandleID(fileName);
-
-   if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
-   final boolean existsAlready =
-   baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
-
-   if (existsAlready) {
-   // we introduce a 
placeholder state handle, that is replaced with the
-   // original from the 
shared state registry (created from a previous checkpoint)
-   sstFiles.put(
-   stateHandleID,
-   new 
PlaceholderStreamStateHandle());
-   } else {
-   
sstFiles.put(stateHandleID, uploadLocalFileToCheckpointFs(filePath));
 
 Review comment:
   I think this logic of sst/placeholders/misc file separation belongs here.
   `RocksDbStateDataTransfer` should be responsible only for the upload.
   
   What if instead of calling `uploadLocalFileToCheckpointFs`, we fill 
`sstFilesToUpload` and `miscFilesToUpload` of `Map` in 
this loop and then call `RocksDbStateDataTransfer.uploadStateFiles` for each 
map? This `uploadStateFiles` could return `Map` which could be put into the final `sstFiles`/`miscFiles`.
   
   I think `ConcurrentHashMap` is not needed then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services