[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579902#comment-17579902
 ] 

ASF GitHub Bot commented on MAPREDUCE-7370:
-------------------------------------------

ashutoshcipher commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r946109580


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();

Review Comment:
   Addressed. 



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

Review Comment:
   Done



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();

Review Comment:
   Done





> Parallelize MultipleOutputs#close call
> --------------------------------------
>
>                 Key: MAPREDUCE-7370
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>    Affects Versions: 3.3.0
>            Reporter: Prabhu Joseph
>            Assignee: groot
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> This call takes more time when there are lot of files to close and there is a 
> high latency to close. Parallelize MultipleOutputs#close call to improve the 
> speed.
> {code}
>   public void close() throws IOException {
>     for (RecordWriter writer : recordWriters.values()) {
>       writer.close(null);
>     }
>   }
> {code}
> Idea is from [~ste...@apache.org]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to