[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718365#comment-16718365
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-446434187
 
 
   @azagrebin Thank you for your explanation. 
   As your explanation, I will 
- first move `DirectExecutorService` into 
`org.apache.flink.runtime.concurrent` and change the `Executor 
Executors#directExecutor()` to `DirectExecutorService 
Executors#direcExecutorService()`, this will be done in a separate commit.
   - then use `Executors#direcExecutorSerivice()` in current patch to share the 
logic of `threadNum = 1` and `threadNum > 1`.
   
   Did I understand right? If yes, I will file a issue and implement the first 
move, and then come back to complete this patch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716776#comment-16716776
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on issue #6777: [FLINK-10461] [State Backends, 
Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-446148631
 
 
   @klion26 right, sorry for confusion.
   I think we should move `DirectExecutorService` into 
`org.apache.flink.runtime.concurrent`.
   We have already there `Executors.directExecutor()`. This could be 
deduplicated.
   `directExecutor()` could become `directExecutorService()` and return 
`DirectExecutorService` singleton as `ExecutorService` instead of `Executor`. 
Then `DirectExecutor` is not needed in `Executors`.
   
   Could you do it as a separate commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716503#comment-16716503
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 edited a comment on issue #6777: [FLINK-10461] [State Backends, 
Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-446110193
 
 
   @azagrebin Thank you for your reply, For unifing the `threadNum > 1` and 
`threadNum = 1` in `RocksDbStateDataTransfer` , we should use 
`DirectExecutorService` for `threadNum = 1`, But I found 
`DirectExecutorService` locates in 
`flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` 
such as I commented above.
   
   For this, I have a question, did I miss something? or should I move 
`flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` to 
`flink-runtime/src/main/java/org/apache/runtime/util/DirectExecutorService`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716500#comment-16716500
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-446110193
 
 
   @azagrebin Hi, For unifing the `threadNum > 1` and `threadNum = 1`, we 
should use `DirectExecutorService` for `threadNum = 1`, But I found 
`DirectExecutorService` locates in 
`flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` 
such as I commented above.
   
   For this, I have a question, did I miss something? or should I move 
`flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` to 
`flink-runtime/src/main/java/org/apache/runtime/util/DirectExecutorService`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714724#comment-16714724
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on issue #6777: [FLINK-10461] [State Backends, 
Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-445814814
 
 
   @klion26, do you have ETA for addressing the comments?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709534#comment-16709534
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238908481
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
 
 Review comment:
   I agree. I found `DirectExecutorService` in test 
code(`flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService`),
 Do I miss something? or should I move it to 
`flink-runtime/src/main/java/org/apache/runtime/util/DirectExecutorService` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708881#comment-16708881
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238712692
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+ 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708879#comment-16708879
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238708730
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+ 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708876#comment-16708876
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238710301
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
 
 Review comment:
   I agree, we will have less code by just using the `DirectExecutorService` 
instead of looping Runnable's. Then code in `runTransferWithMultipleThreads` 
could go back to `downloadDataForAllStateHandles` and 
`runTransferWithSingleThread` would not be needed anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708869#comment-16708869
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238708730
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+ 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708853#comment-16708853
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238362343
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+ 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708855#comment-16708855
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238702567
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+  

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708854#comment-16708854
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238702953
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (isMultiThreadDownloadEnabled(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+
+   runnables.add(ThrowingRunnable.unchecked(new 
ThrowingRunnable() {
+   @Override
+   public void run() throws IOException {
+   downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry);
+   }
+   }));
+   }
+   

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708856#comment-16708856
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238701173
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -686,6 +697,17 @@ public RocksDBNativeMetricOptions 
getMemoryWatcherOptions() {
return options;
}
 
+   /**
+* Gets the thread number will used for downloading files from DFS when 
restore.
+*/
+   public int getNumberOfRestoringThreads() {
+   return restoringThreadNum == -1 ? 
RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : 
restoringThreadNum;
 
 Review comment:
   Can we also rename the field that it matches the getter?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708834#comment-16708834
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238700871
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
 
 Review comment:
   `DirectExecutorService` would execute everything in the calling thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708579#comment-16708579
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238623643
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
 
 Review comment:
   In my opinion, if `restoringThreadNum == 1`, we should run the `runnables` 
in current thread other than in the executorService, which is the current 
behavior. What do you think about it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707559#comment-16707559
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238362343
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+ 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707448#comment-16707448
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238329439
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
 
 Review comment:
   Wouldn't it be better to first instantiate the `ExecutorService` and then 
use it to download the files.
   ```
   ExecutorService executorService = createExecutorService(restoringThreadNum);
   
   for (Runnable runnable: createDownloadRunnables()) {
   executorService.execute(runnable);
   }
   ```
   
   if `restoringThreadNum == 1`, then we create a direct executor service. That 
way we would not need to duplicate the logic in `runTransferWithSingleThread`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707450#comment-16707450
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238328094
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 Review comment:
   Is it possible to write this test without mocking?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707451#comment-16707451
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238330014
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+  

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707446#comment-16707446
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238324894
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
 
 Review comment:
   Inconsistent code style in this file. Closing bracket should be on the 
previous line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707444#comment-16707444
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238320078
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RocksDbStateDataTransfer.
+ */
+public class RocksDBStateDataTransferTest {
 
 Review comment:
   Should extend `TestLogger`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707452#comment-16707452
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238318368
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -686,6 +697,17 @@ public RocksDBNativeMetricOptions 
getMemoryWatcherOptions() {
return options;
}
 
+   /**
+* Gets the thread number will used for downloading files from DFS when 
restore.
+*/
+   public int getrestoringThreadNum() {
 
 Review comment:
   getter and setter should be `getRestoringThreadNum` better 
`getNumberOfRestoringThreads`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707449#comment-16707449
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238331599
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+  

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707453#comment-16707453
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238320013
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+  

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707443#comment-16707443
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238324373
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
 
 Review comment:
   Does this need to be public?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707445#comment-16707445
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238325757
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
 
 Review comment:
   Inconsistent code style


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707447#comment-16707447
 ] 

ASF GitHub Bot commented on FLINK-10461:


tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r238326175
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+
+   if (enableMultiThreadDownload(restoringThreadNum)) {
+   runTransferWithMultipleThreads(runnables, 
restoringThreadNum, closeableRegistry);
+   } else {
+   runTransferWithSingleThread(runnables);
+   }
+   }
+
+   private static List createDownloadRunnables(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> downloadDataForStateHandle(path, 
remoteFileHandle, closeableRegistry));
+   }
+   return runnables;
+   }
+
+   private static void runTransferWithMultipleThreads(
+   List runnables,
+   int threadNum,
+   CloseableRegistry closeableRegistry) throws ExecutionException, 
InterruptedException, IOException {
+
+  

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704758#comment-16704758
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-443209887
 
 
   @azagrebin got it, thank you for the explanation, i've just remove the flag 
`running`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704680#comment-16704680
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on issue #6777: [FLINK-10461] [State Backends, 
Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-443193074
 
 
   In general, I do not see any problem in having multiple threads, downloading 
even the same data. If your question is whether they can interfere with each 
other locally, the target directories are randomised so they should download to 
different locations, even if at the same time. Eventually, it is the JM job 
that only one of them will be used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704659#comment-16704659
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-443188022
 
 
   @azagrebin thank for your reply, I have a question about this, will JM 
failover lead to two download threads? If this is no, I'll change and remove 
the `running` flag, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704490#comment-16704490
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on issue #6777: [FLINK-10461] [State Backends, 
Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-443148880
 
 
   Thanks for addressing the comment.
   
   One more thing, I think we do not actually need the `running` flag because 
we register streams in closable registry. In case of canceling the job, the 
runnables will either fail to register streams or will be interrupted because 
streams will be closed unexpectedly and break the while loop anyways with IO 
exception which is what we want. The while loops will not hold executor 
shutdown for long. Sorry for confusion.
   
   Can we still change it and remove the atomic `running`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704215#comment-16704215
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-443079619
 
 
   Thanks a lot for your reviews, I learnt a lot from them. @azagrebin 
   I fixed the java doc, and thank you again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703374#comment-16703374
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r237538899
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 ##
 @@ -45,4 +45,12 @@
.withDescription(String.format("This determines the factory for 
timer service state implementation. Options " +
"are either %s (heap-based, default) or %s for an 
implementation based on RocksDB .",
HEAP.name(), ROCKSDB.name()));
+
+   /**
+* The thread numbers used to download files from DFS in 
RocksDBStateBackend.
 
 Review comment:
   Also here without s:
   `The thread number used`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702571#comment-16702571
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-442662282
 
 
   @azagrebin thanks for your reviewing, i'vs just addressed the comments, 
please help to review this when you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701960#comment-16701960
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r237105565
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
restoredThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
restoredThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   AtomicBoolean running = new AtomicBoolean(true);
+   List runnables = 
getRunnablesForStateHandles(stateHandleMap, restoreInstancePath, 
closeableRegistry, running);
+
+   if (enableMultiThreadDownload(restoredThreadNum)) {
+   runRunnalbesMultithread(runnables, restoredThreadNum, 
closeableRegistry, running);
 
 Review comment:
   I would rename like this:
   transferAllDataFromStateHandles -> downloadDataForAllStateHandles
   getRunnablesForStateHandles ->createDownloadRunnables
   runRunnalbesMultithread  -> runTransferWithMultipleThreads
   runRunnablesSingleThread  -> runTransferWithSingleThread
   copyStateDataHandleData -> downloadDataForStateHandle
   
   also I would rename everywhere:
   restoredThreadNum -> restoringThreadNum


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701961#comment-16701961
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r237104577
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
restoredThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
restoredThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   AtomicBoolean running = new AtomicBoolean(true);
+   List runnables = 
getRunnablesForStateHandles(stateHandleMap, restoreInstancePath, 
closeableRegistry, running);
+
+   if (enableMultiThreadDownload(restoredThreadNum)) {
+   runRunnalbesMultithread(runnables, restoredThreadNum, 
closeableRegistry, running);
+   } else {
+   runRunnablesSingleThread(runnables);
+   }
+   }
+
+   private static List getRunnablesForStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   CloseableRegistry closeableRegistry,
+   AtomicBoolean running
+   ) {
+   List runnables = new 
ArrayList<>(stateHandleMap.size());
+   for (Map.Entry entry : 
stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = entry.getValue();
+
+   Path path = new Path(restoreInstancePath, 
stateHandleID.toString());
+   runnables.add(() -> copyStateDataHandleData(path, 
remoteFileHandle, closeableRegistry, running));
+   }
+   return runnables;
+   }
+
+   private static void runRunnalbesMultithread(
+   List 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701950#comment-16701950
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r236202672
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   private static volatile AtomicBoolean running = new 
AtomicBoolean(false);
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
restoredThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
restoredThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
 
 Review comment:
   I think this method should be broken down into 3 methods:
   - loop which creates `List`
   - multi-threaded version where `executorService`, `running` and 
`shutDownAllTasksAndExecutorService` are used and Runnables are submitted in a 
loop with try/catch.
   - single-threaded version which just runs Runnables.
   If `restoredThreadNum` > 1 then multi-threaded version should be called in 
this method else single-threaded version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701508#comment-16701508
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-442356732
 
 
   @azagrebin thanks for your review, I've just addressed all your comments and 
created[ FLINK-11008](https://issues.apache.org/jira/browse/FLINK-11008) for 
follow up issue. Please help to review this when you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698754#comment-16698754
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r236199821
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -276,6 +312,12 @@ private RocksDBStateBackend(RocksDBStateBackend original, 
Configuration config)
this.enableIncrementalCheckpointing = 
original.enableIncrementalCheckpointing.resolveUndefined(

config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
 
+   if (original.restoredThreadNum == 
CheckpointingOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue()) {
 
 Review comment:
   here we could initialise `restoredThreadNum` from `config` if 
`original.restoredThreadNum` is -1 (undefined). Otherwise `restoredThreadNum` = 
`original.restoredThreadNum`, similar to 
`original.enableIncrementalCheckpointing.resolveUndefined`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698755#comment-16698755
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r236202672
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   private static volatile AtomicBoolean running = new 
AtomicBoolean(false);
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
restoredThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
restoredThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
 
 Review comment:
   I think this methid should be broken down into 3 methods:
   - loop which creates `List`
   - multi-threaded version where `executorService`, `running` and 
`shutDownAllTasksAndExecutorService` are used and Runnables are submitted in a 
loop with try/catch.
   - single-threaded version which just runs Runnables.
   If `restoredThreadNum` > 1 then multi-threaded version should be called in 
this method else single-threaded version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698756#comment-16698756
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r236186387
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ##
 @@ -115,4 +115,12 @@
.defaultValue(1024)
.withDescription("The minimum size of state data files. 
All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata 
file.");
+
+   /**
+* The thread numbers used to download files from DFS in 
RocksDBStateBackend.
+*/
+   public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM 
= ConfigOptions
+   .key("state.checkpoint.restore.thread.num")
+   .defaultValue(-1)
 
 Review comment:
   I think default value 1 was reasonable before, that is what we have now.
   -1 looks rather as undefined value.
   
   I would also consider to change option prefix to contain `rocksdb` in it:
   `state.backend.rocksdb.checkpoint.restore.thread.num`
   and to move the option to 
`org.apache.flink.contrib.streaming.state.RocksDBOptions`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698751#comment-16698751
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r236198988
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -236,8 +239,25 @@ public RocksDBStateBackend(StateBackend 
checkpointStreamBackend) {
 * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
 */
public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
+   this(checkpointStreamBackend, enableIncrementalCheckpointing, 
CheckpointingOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue());
+   }
+
+   /**
+* Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
+* checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
+* here where the snapshots from RocksDB would be stored.
+*
+* The snapshots of the RocksDB state will be stored using the given 
backend's
+* {@link StateBackend#createCheckpointStorage(JobID)}.
+*
+* @param checkpointStreamBackend The backend write the checkpoint 
streams to.
+* @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
+* @param restoredThreadNum thread num used to download files from DFS 
when restore.
+*/
+   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing, int restoredThreadNum) {
this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
+   this.restoredThreadNum = restoredThreadNum;
 
 Review comment:
   Actually, instead of changing and exploding the constructor signatures, I 
would suggest to set it here to -1 (undefined). Then we could remove `final` 
from `restoredThreadNum` and add setter method for it, like for 
`predefinedOptions`. Users could use the setter to set this option per job.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698752#comment-16698752
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r236200642
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -686,6 +729,13 @@ public RocksDBNativeMetricOptions 
getMemoryWatcherOptions() {
return options;
}
 
+   /**
+* Gets the thread number will used for downloading files from DFS when 
restore.
+*/
+   public int getRestoredThreadNum() {
+   return restoredThreadNum;
 
 Review comment:
   here we could return `restoredThreadNum` if it is not -1 (defined) and 
`CheckpointingOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue()` if 
`restoredThreadNum` is still -1 (undefined).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698753#comment-16698753
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r236203718
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   private static volatile AtomicBoolean running = new 
AtomicBoolean(false);
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
restoredThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
restoredThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoredThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   final ExecutorService executorService = 
enableMultiThreadDownload(restoredThreadNum) ?
+   Executors.newFixedThreadPool(restoredThreadNum) 
:
+   null;
+
+   running.set(true);
 
 Review comment:
   `AtomicBoolean running = new AtomicBoolean(true);` 
   can be local variable passed to `copyStateDataHandleData`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692689#comment-16692689
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-440143611
 
 
   Hi, @azagrebin , please help to review this when you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674635#comment-16674635
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-435747811
 
 
   Hi, @azagrebin , I've just addressed all comments, please help to review 
this when you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667357#comment-16667357
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228937420
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ##
 @@ -115,4 +115,12 @@
.defaultValue(1024)
.withDescription("The minimum size of state data files. 
All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata 
file.");
+
+   /**
+* The thread numbers used to download files from DFS in 
RocksDBStateBackend.
+*/
+   public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM 
= ConfigOptions
+   .key("state.checkpoint.restore.thread.num")
+   .defaultValue(1)
+   .withDescription("The thread numbers used to download files 
from DFS in RocksDBStateBackend.");
 
 Review comment:
   also here: `The thread number` without `s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667360#comment-16667360
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228970266
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
resotreThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
resotreThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   final ExecutorService executorService = 
Executors.newFixedThreadPool(resotreThreadNum);
+   List> futures = new LinkedList<>();
+   List closeables = new LinkedList<>();
+
+   try {
+   closeables.add(() -> executorService.shutdownNow());
 
 Review comment:
   I think the tasks should be first properly canceled and then 
`executorService` should shutdown.
   `executorService.shutdownNow` also calls `future.cancel` which is 
interrupting the task thread.
   At the moment `while (true)` in `copyStateDataHandleData` does not support 
explicit cancelation.
   I suggest to create an `running = AtomicBoolean(true)` in the beginning then 
`while (true)` can be `while(running.get())`. This will allow to cancel all 
tasks at once and shutdown `executorService` in one `closeable`:
   `closeables.add(() -> { running.set(false); executorService.shutdownNow()); 
};`
   List of `closeables` is not needed then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667354#comment-16667354
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228937223
 
 

 ##
 File path: docs/_includes/generated/checkpointing_configuration.html
 ##
 @@ -32,6 +32,11 @@
 false
 
 
+
+state.checkpoint.restore.thread.num
+1
+The thread numbers used to download files from DFS in 
RocksDBStateBackend.
 
 Review comment:
   I think `The thread number` should be singular without `s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667359#comment-16667359
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228970609
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
resotreThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
resotreThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   final ExecutorService executorService = 
Executors.newFixedThreadPool(resotreThreadNum);
+   List> futures = new LinkedList<>();
+   List closeables = new LinkedList<>();
+
+   try {
+   closeables.add(() -> executorService.shutdownNow());
+   
closeableRegistry.registerCloseable(((LinkedList) 
closeables).getLast());
+
+   for (Map.Entry entry 
: stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = 
entry.getValue();
+
+   if (resotreThreadNum > 1) {
+   CompletableFuture future = 
CompletableFuture.runAsync(() -> {
+   try {
+   
copyStateDataHandleData(new Path(restoreInstancePath, 
stateHandleID.toString()), remoteFileHandle, closeableRegistry);
+   } catch (IOException e) {
 
 Review comment:
   the exception handling can go inside `copyStateDataHandleData`.
   `new Path(restoreInstancePath, stateHandleID.toString())` can be a variable 
to reduce the line length.
   I would create a list of `Runnable`'s:
   ```
   Path path = new Path(restoreInstancePath, stateHandleID.toString());
   runnables.add( -> 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667355#comment-16667355
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228976096
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RocksDbStateDataTransfer.
+ */
+public class RocksDBStateDataTransferTest {
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testThreadPoolExceptionRethrow() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = 
mock(IncrementalKeyedStateHandle.class);
+
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread restore.");
+   StreamStateHandle mockStateHandle = 
mock(StreamStateHandle.class);
+   
when(mockStateHandle.openInputStream()).thenThrow(expectedException);
+
+   Map sharedStateHandle = new 
HashMap<>(1);
+   sharedStateHandle.put(new StateHandleID("mock"), 
mockStateHandle);
+   
when(stateHandle.getSharedState()).thenReturn(sharedStateHandle);
+
+   try {
+   
RocksDbStateDataTransfer.transferAllStateDataToDirectory(stateHandle, new 
Path(temporaryFolder.newFolder().toURI()), 5, new CloseableRegistry());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, 
e.getCause().getCause());
+   }
+   }
+
+   /**
+* Tests that download files with multi-thread correctly.
+* @throws Exception
+*/
+   @Test
+   public void testMultiThreadRestoreCorrectly() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = 
mock(IncrementalKeyedStateHandle.class);
+
+   byte[] content1 = new byte[1];
 
 Review comment:
   I would create a list of something like 6 test contexts and use loops over 
them for further actions.
   content length can be also random.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667356#comment-16667356
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228964472
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -235,9 +238,10 @@ public RocksDBStateBackend(StateBackend 
checkpointStreamBackend) {
 * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
 * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
 */
-   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
+   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing, int restoreThreadNum) {
 
 Review comment:
   I would also leave the previous method without restoreThreadNum:
   ```
   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
   this(checkpointStreamBackend, enableIncrementalCheckpointing, -1);
   }
   ```
   this way we do not break existing code, including other methods here, like 
this modification which is not needed then and other similar ones:
   ```
   public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
this(checkpointStreamBackend, TernaryBoolean.UNDEFINED, 1);
}
   ```
   
   and I would rather keep it undefined here, e.g. `-1`, similar to 
`TernaryBoolean.UNDEFINED` for `enableIncrementalCheckpointing`. It can be 
resolved then in `private RocksDBStateBackend(RocksDBStateBackend original, 
Configuration config)` and `getRestoreThreadNum`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667358#comment-16667358
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228967531
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
 
 Review comment:
   typo: `resotreThreadNum` -> `restoredThreadNum`, also in other methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657653#comment-16657653
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-431539541
 
 
   Hi, @azagrebin , could you please help to review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646246#comment-16646246
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-428903419
 
 
   @azagrebin I've updated the PR based on your comments, Please help reviewing 
it when you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641854#comment-16641854
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-427837454
 
 
   Thank you for you review, @azagrebin  ! I will push another commit to 
address your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639908#comment-16639908
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r223030850
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles(
Map stateHandleMap,
Path restoreInstancePath) throws IOException {
 
+   ExecutorService executorService = 
Executors.newFixedThreadPool(5);
+   List> tasks = new 
ArrayList<>(stateHandleMap.size());
+
for (Map.Entry entry 
: stateHandleMap.entrySet()) {
StateHandleID stateHandleID = entry.getKey();
StreamStateHandle remoteFileHandle = 
entry.getValue();
-   copyStateDataHandleData(new 
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
+
+   FutureTask task = new FutureTask<>(() -> {
+   try {
+   copyStateDataHandleData(new 
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
+   } catch (IOException e) {
+   LOG.error("Copy State Data 
error, stateHandleID [{}], remoteFileHandle[{}]", stateHandleID.toString(), 
remoteFileHandle);
+   throw e;
+   }
+   return null;
+   });
+   tasks.add(task);
+   executorService.submit(task);
 
 Review comment:
   I suggest to use `CompletableFuture.runAsync(Runnable, Executor)` instead of 
`FutureTask`. It allows to use 
`FutureUtils.waitForAll(CompletableFutures).get()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639916#comment-16639916
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r223029260
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles(
Map stateHandleMap,
Path restoreInstancePath) throws IOException {
 
+   ExecutorService executorService = 
Executors.newFixedThreadPool(5);
 
 Review comment:
   Number of threads should be configurable. For example, the same way as 
incremental checkpointing. At least in RocksDBStateBackend constructor and 
CheckpointingOptions. The default behaviour (e.g. number of threads 1) can be 
as it is now, basically running in current thread. We also can create a follow 
up issue to reconsider rocksdb backend configuration later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639912#comment-16639912
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r223032334
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles(
Map stateHandleMap,
Path restoreInstancePath) throws IOException {
 
+   ExecutorService executorService = 
Executors.newFixedThreadPool(5);
 
 Review comment:
   Cancelation of `copyStateDataHandleData` futures and 
`executorService.shutdownNow()` should be registered in 
`cancelStreamRegistry.registerCloseable()` and unregistered with 
`unregisterCloseable` before `executorService.shutdown()` in `finally`. Similar 
to streams in `copyStateDataHandleData`. This way we guarantee freeing of all 
resources in case of job abrupt shutdown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639911#comment-16639911
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r223034163
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendTest.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RocksDBKeyedStateBackend.
+ */
+public class RocksDBKeyedStateBackendTest {
 
 Review comment:
   In addition to failure scenario, I would also then add a test which mocks 
`StreamStateHandle` streams and checks that all streams are read and written 
correctly in parallel.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639909#comment-16639909
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r223030317
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles(
Map stateHandleMap,
Path restoreInstancePath) throws IOException {
 
+   ExecutorService executorService = 
Executors.newFixedThreadPool(5);
+   List> tasks = new 
ArrayList<>(stateHandleMap.size());
+
for (Map.Entry entry 
: stateHandleMap.entrySet()) {
StateHandleID stateHandleID = entry.getKey();
StreamStateHandle remoteFileHandle = 
entry.getValue();
-   copyStateDataHandleData(new 
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
+
+   FutureTask task = new FutureTask<>(() -> {
 
 Review comment:
   I think it would be more maintainable if we extract these 3 methods:
   - `transferAllStateDataToDirectory`
   - `transferAllDataFromStateHandles`
   - `copyStateDataHandleData`
   into a separate class file, e.g. `RocksDbStateDataTransfer`.
   `RocksDBKeyedStateBackend` already contains a lot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639910#comment-16639910
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r223029260
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles(
Map stateHandleMap,
Path restoreInstancePath) throws IOException {
 
+   ExecutorService executorService = 
Executors.newFixedThreadPool(5);
 
 Review comment:
   Number of threads should be configurable. For example, the same way as 
incremental checkpointing. At least in RocksDBStateBackend constructor and 
CheckpointingOptions. The default behaviour (e.g. number of threads 1) can be 
as it is now, basically running in current thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632738#comment-16632738
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-425609212
 
 
   cc @StefanRRichter 
   I  use 5 threads for downloading files, because I think 5 is enough for  a 
lot of scenarios, and from the experience of my company using 5 threads looks 
good.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632735#comment-16632735
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 opened a new pull request #6777: [FLINK-10461] [State Backends, 
Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777
 
 
   ## What is the purpose of the change
   
   This PR speed up the download files when restore from DFS using multi-thread.
   
   ## Brief change log
   Use multi-thread from download files when restore from DFS.
   
   ## Verifying this change
   
   Add a UT for test exception rethrow in thread pool
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no)
 - If yes, how is the feature documented? (**not applicable**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)