[
https://issues.apache.org/jira/browse/MAPREDUCE-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585030#comment-17585030
]
ASF GitHub Bot commented on MAPREDUCE-7370:
-------------------------------------------
cnauroth commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r955436533
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java:
##########
@@ -70,6 +76,20 @@ public void testWithCounters() throws Exception {
_testMOWithJavaSerialization(true);
}
+ @Test(expected=IOException.class)
+ public void testParallelClose() throws IOException, InterruptedException {
Review Comment:
I suggest naming this `testParallelCloseIOException` to make it clear that
we are testing an error case.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -527,9 +558,41 @@ public void collect(Object key, Object value) throws
IOException {
* @throws java.io.IOException thrown if any of the MultipleOutput files
* could not be closed properly.
*/
- public void close() throws IOException {
+ public void close() throws IOException, InterruptedException {
+ int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
+ MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
+ AtomicBoolean encounteredException = new AtomicBoolean(false);
+ ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
+ .setUncaughtExceptionHandler(
Review Comment:
`IOException` is now being propagated back to the caller of `close()`, but
any unexpected (unchecked) exceptions would be swallowed in this uncaught
exception handler. This is different from the existing code, where the caller
would receive the unchecked exception.
I think the best we can do here is to set `encounteredException` from within
the uncaught exception handler, resulting in throwing the `IOException` at the
bottom of the method. The message would need to be generalized to "encountered
exception during close", not mentioning `IOException`, because it might also
have been some other unchecked exception.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
Review Comment:
Sorry, I think I gave some bad advice here. I see now that you're using
`invokeAll`, and that method only returns after all invocations complete.
Therefore, we know the work is all done, and we can proceed to `shutdown`.
Calling `awaitTermination` opens up a new problem: how to decide on the
timeout, here arbitrarily chosen as 50 seconds. Since we don't need really need
`awaitTermination`, we might as well remove it and avoid that problem.
> Parallelize MultipleOutputs#close call
> --------------------------------------
>
> Key: MAPREDUCE-7370
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Affects Versions: 3.3.0
> Reporter: Prabhu Joseph
> Assignee: groot
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> This call takes more time when there are lot of files to close and there is a
> high latency to close. Parallelize MultipleOutputs#close call to improve the
> speed.
> {code}
> public void close() throws IOException {
> for (RecordWriter writer : recordWriters.values()) {
> writer.close(null);
> }
> }
> {code}
> Idea is from [[email protected]]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]