[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250477821
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 ##
 @@ -36,43 +36,47 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
  */
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+   public RocksDBStateDownloader(int restoringThreadNum) {
+   super(restoringThreadNum);
+   }
 
-   static void transferAllStateDataToDirectory(
+   /**
+* Transfer all state data to the target directory using specified 
number of threads.
+*
+* @param restoreStateHandle Handles used to retrieve the state data.
+* @param dest The target directory which the state data will be stored.
+* @param closeableRegistry Which all the inputStream/outputStream will 
be registered and unregistered.
+*
+* @throws Exception Thrown if can not transfer all the state data.
+*/
+   public void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest,
-   int restoringThreadNum,
CloseableRegistry closeableRegistry) throws Exception {
 
final Map sstFiles =
restoreStateHandle.getSharedState();
final Map miscFiles =
restoreStateHandle.getPrivateState();
 
-   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
-   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(sstFiles, dest, 
closeableRegistry);
 
 Review comment:
   @azagrebin We can not share `closeableRegistry` because of supportting 
parallel snapshot. If we share the `closeableRegistry` when parallel snapshot, 
the later complete snapshot will come into an Exception `IOException("Cannot 
register Closeable, registry is already closed. Closing argument.")`(The 
`closeableRegistry` was close in `AsyncSnapshotCallable#closeSnapshotIO`) when 
registering the input/outputstream to the registry, such as the [Travis log 
said](https://travis-ci.org/apache/flink/jobs/483728783).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250456369
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/**
+ * Help class for uploading RocksDB state.
+ */
+public class RocksDBStateUploader extends RocksDBStateDownloader {
 
 Review comment:
   My bad, I chose the wrong one when using auto-complete.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-16 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r24807
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -218,7 +218,10 @@
private final boolean enableIncrementalCheckpointing;
 
/** Thread number used to download from DFS when restore. */
-   private final int restoringThreadNum;
+   private final int numberOfRestoringThreads;
+
+   /** Thread number used to upload to DFS when snapshot. */
+   private final int numberOfSnapshottingThreads;
 
 Review comment:
   @StefanRRichter In my first thought, I just considered the flexibility. It 
makes sense to me that we union the two parameters, because both are for 
incremental snapshot/restore.
   
   In my company, the snapshot processure is not a bottleneck, we only enabled 
multithread restoring.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-16 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r24807
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -218,7 +218,10 @@
private final boolean enableIncrementalCheckpointing;
 
/** Thread number used to download from DFS when restore. */
-   private final int restoringThreadNum;
+   private final int numberOfRestoringThreads;
+
+   /** Thread number used to upload to DFS when snapshot. */
+   private final int numberOfSnapshottingThreads;
 
 Review comment:
   @StefanRRichter In my opinion, there may be a scenario that we use 
multithread to restore from the checkpoint but do not need to use multithread 
to snapshot when incremental checkpoint.
   
   In my company, we only enabled multithread restoring. we use only 
RocksDBIncremental Checkpoint to snapshot and restore from the latest completed 
checkpoint(we do not use savepoint to restore the job and do not use 
HeapStateBackend in the product). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-16 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r24807
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -218,7 +218,10 @@
private final boolean enableIncrementalCheckpointing;
 
/** Thread number used to download from DFS when restore. */
-   private final int restoringThreadNum;
+   private final int numberOfRestoringThreads;
+
+   /** Thread number used to upload to DFS when snapshot. */
+   private final int numberOfSnapshottingThreads;
 
 Review comment:
   @StefanRRichter In my opinion, there may be a scenario that we use 
multithread to restore from the savepoint but do not need to use multithread to 
snapshot when incremental checkpoint.
   
   In my company, we only enabled multithread restoring. we use only 
RocksDBIncremental Checkpoint to snapshot and restore from the latest completed 
checkpoint(we do not use savepoint to restore the job and do not use 
HeapStateBackend in the product). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246367385
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() {
+   @Nullable
+   @Override
+   public StreamStateHandle closeAndGetHandle() {
+   return new ByteStreamStateHandle("testHandle", 
"testHandle".getBytes());
+   }
+
+   @Override
+   public void close() {
+   }
+
+   @Override
+   public long getPos() {
+   return 0;
+   }
+
+   @Override
+   public void flush() {
+   }
+
+   @Override
+   public void sync() {
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   throw expectedException;
+   }
+   };
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry(),
+   new HashMap<>());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
+   File checkpointPrivateFolder = 
temporaryFolder.newFolder("private");
+   Path checkpointPrivateDirectory = new 
Path(checkpointPrivateFolder.getPath());
+
+   File checkpointSharedFolder = 
temporaryFolder.newFolder("shared");
+   Path checkpointSharedDirectory = new 
Path(checkpointSharedFolder.getPath());
+
+   FileSystem fileSystem = 
checkpointPrivateDirectory.getFileSystem();
+   int fileStateSizeThreshold = 1024;
+   FsCheckpointStreamFactory checkpointStreamFactory =
+   new FsCheckpointStreamFactory(fileSystem, 
checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold);
+
+   String localFolder = "local";
+   temporaryFolder.newFolder(localFolder);
+
+   int sstFileCount = 6;
+   Map sstFilePaths = new 
HashMap<>(sstFileCount);
+   generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold, sstFilePaths);
+
+   int miscFileCount = 3;
+   Map miscFilePaths = new 
HashMap<>(miscFileCount);
+   ThreadLocalRandom random = ThreadLocalRandom.current();
+   File currentFile = 
temporaryFolder.newFile(String.format("%s/CURRENT", localFolder));
+   generateRandomFileContent(currentFile.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("CURRENT"), 
Path.fromLocalFile(currentFile));
+
+   File manifest = 
temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder));
+   generateRandomFileContent(manifest.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("MANIFEST"), 
Path.fromLocalFile(manifest));
+
+   File options = 
temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder));
+ 

[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246358163
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadFilesToCheckpointFs(
+   @Nonnull Map files,
+   int numberOfSnapshottingThreads,
+   CheckpointStreamFactory checkpointStreamFactory,
+   CloseableRegistry closeableRegistry,
+   Map hanldes) throws Exception 
{
 
 Review comment:
   @azagrebin The implementation here returns `handles` implicitly because I 
thought the caller of the function `uploadFilesToCheckpointFs` may know the 
size of `handles`, and could init the map by `new HashMap(size)`. 
   I agree that the map's size here would not be too large because we'are under 
incremental mode, I'll change to the explicit mode. Anyway, I'll add a java doc 
for these two public functions.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246358163
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +69,88 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadFilesToCheckpointFs(
+   @Nonnull Map files,
+   int numberOfSnapshottingThreads,
+   CheckpointStreamFactory checkpointStreamFactory,
+   CloseableRegistry closeableRegistry,
+   Map hanldes) throws Exception 
{
 
 Review comment:
   @azagrebin The implementation here returns `handles` implicitly because I 
thought the caller of the function `uploadFilesToCheckpointFs` may know the 
size of `handles`, and could init the map by `new HashMap(size)`. 
   I agree that the map's size here would not be too large because we'are under 
incremental mode, I'll change to the explicitly mode. Anyway, I'll add a java 
doc for these two public function.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246350022
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +148,157 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = new CheckpointStreamFactory.CheckpointStateOutputStream() {
+   @Nullable
+   @Override
+   public StreamStateHandle closeAndGetHandle() {
+   return new ByteStreamStateHandle("testHandle", 
"testHandle".getBytes());
+   }
+
+   @Override
+   public void close() {
+   }
+
+   @Override
+   public long getPos() {
+   return 0;
+   }
+
+   @Override
+   public void flush() {
+   }
+
+   @Override
+   public void sync() {
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   throw expectedException;
+   }
+   };
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry(),
+   new HashMap<>());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
+   File checkpointPrivateFolder = 
temporaryFolder.newFolder("private");
+   Path checkpointPrivateDirectory = new 
Path(checkpointPrivateFolder.getPath());
+
+   File checkpointSharedFolder = 
temporaryFolder.newFolder("shared");
+   Path checkpointSharedDirectory = new 
Path(checkpointSharedFolder.getPath());
+
+   FileSystem fileSystem = 
checkpointPrivateDirectory.getFileSystem();
+   int fileStateSizeThreshold = 1024;
+   FsCheckpointStreamFactory checkpointStreamFactory =
+   new FsCheckpointStreamFactory(fileSystem, 
checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold);
+
+   String localFolder = "local";
+   temporaryFolder.newFolder(localFolder);
+
+   int sstFileCount = 6;
+   Map sstFilePaths = new 
HashMap<>(sstFileCount);
+   generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold, sstFilePaths);
+
+   int miscFileCount = 3;
+   Map miscFilePaths = new 
HashMap<>(miscFileCount);
+   ThreadLocalRandom random = ThreadLocalRandom.current();
+   File currentFile = 
temporaryFolder.newFile(String.format("%s/CURRENT", localFolder));
+   generateRandomFileContent(currentFile.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("CURRENT"), 
Path.fromLocalFile(currentFile));
+
+   File manifest = 
temporaryFolder.newFile(String.format("%s/MANIFEST", localFolder));
+   generateRandomFileContent(manifest.getPath(), 
random.nextInt(fileStateSizeThreshold) + 1);
+   miscFilePaths.put(new StateHandleID("MANIFEST"), 
Path.fromLocalFile(manifest));
+
+   File options = 
temporaryFolder.newFile(String.format("%s/OPTIONS", localFolder));
+ 

[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-28 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244333053
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadStateFiles(
+   CheckpointStreamFactory checkpointStreamFactory,
+   SnapshotDirectory localBackupDirectory,
+   Set baseSstFiles,
+   int uploadingThreadNum,
+   CloseableRegistry snapshotCloseableRegistry,
+   @Nonnull ConcurrentHashMap 
sstFiles,
+   @Nonnull ConcurrentHashMap 
miscFiles) throws Exception {
+
+   Preconditions.checkState(localBackupDirectory.exists());
+
+   FileStatus[] fileStatuses = localBackupDirectory.listStatus();
+   if (fileStatuses != null) {
+   ExecutorService executorService = 
createExecutorService(uploadingThreadNum);
+
+   try {
+   List runnables = 
createUploadRunnables(
 
 Review comment:
   @azagrebin thank you for your reply, I'll update the code according to your 
suggestions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-28 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r244318378
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +74,144 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   public static void uploadStateFiles(
+   CheckpointStreamFactory checkpointStreamFactory,
+   SnapshotDirectory localBackupDirectory,
+   Set baseSstFiles,
+   int uploadingThreadNum,
+   CloseableRegistry snapshotCloseableRegistry,
+   @Nonnull ConcurrentHashMap 
sstFiles,
+   @Nonnull ConcurrentHashMap 
miscFiles) throws Exception {
+
+   Preconditions.checkState(localBackupDirectory.exists());
+
+   FileStatus[] fileStatuses = localBackupDirectory.listStatus();
+   if (fileStatuses != null) {
+   ExecutorService executorService = 
createExecutorService(uploadingThreadNum);
+
+   try {
+   List runnables = 
createUploadRunnables(
 
 Review comment:
   Hi @azagrebin, when I want to implement this by using `Callable`, I can not 
find `java.util.concurrent.CompletableFuture.callAsync` function(JDK 8), after 
searching in flink project, I just find 
`org.apache.flink.runtime.rpc.MainThreadExecutable#callAsync`,  Did I miss 
something?
   
   In other words, If I use `Callable` instead of `Runnable`, should I 
implement a 
`ThrowingCallable` just as `ThrowingRunnable`.
   
   If I use `Runnable` to implement this, how about create a 
`CompletableFuture` instance and pass to the called function 
`uploadLocalFileToCheckpointFs(Path, CompletableFuture)`, in 
`uploadLocalFileToCheckpointFs` we complete the passed in 
`CompletableFuture`,  such as below
   
   ```
   Map> futures = new 
HashMap();
   // futures will update in createUploadRunnables
   List runnables = createUploadRunnables(sstFilePaths, 
miscFilePaths, futures);
   for (Runnable runnable : runnables) {
   CompletableFuture.runAsync(runnable, executorService);
   }
   FutureUtils.waitForAll(futures.values()).get();
   
   
   
   private static StreamStateHandle uploadLocalFileToCheckpointFs(Path path, 
CompletableFuture future) {
  try {
  ...
  future.complete(...)
  } finally {
  
  }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2018-12-23 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r243766116
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -20,31 +20,44 @@
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
 import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
  * Data transfer utils for {@link RocksDBKeyedStateBackend}.
  */
-class RocksDbStateDataTransfer {
+public class RocksDbStateDataTransfer {
+   private static final int READ_BUFFER_SIZE = 16 * 1024;
 
static void transferAllStateDataToDirectory(
 
 Review comment:
   hi @gyfora ,thank you for the review.  For consistency do you mean the 
method and the class have the same access modifiers?   and could you please 
tell me where will the method be integrated in.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services