[
https://issues.apache.org/jira/browse/MAPREDUCE-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579903#comment-17579903
]
ASF GitHub Bot commented on MAPREDUCE-7370:
-------------------------------------------
ashutoshcipher commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r946109977
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
+ }
+
+ if (ioException.get() != null) {
+ throw new IOException(ioException.get());
Review Comment:
Addressed
> Parallelize MultipleOutputs#close call
> --------------------------------------
>
> Key: MAPREDUCE-7370
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Affects Versions: 3.3.0
> Reporter: Prabhu Joseph
> Assignee: groot
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> This call takes more time when there are lot of files to close and there is a
> high latency to close. Parallelize MultipleOutputs#close call to improve the
> speed.
> {code}
> public void close() throws IOException {
> for (RecordWriter writer : recordWriters.values()) {
> writer.close(null);
> }
> }
> {code}
> Idea is from [[email protected]]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]