[
https://issues.apache.org/jira/browse/MAPREDUCE-7370?focusedWorklogId=790611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-790611
]
ASF GitHub Bot logged work on MAPREDUCE-7370:
---------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Jul/22 22:16
Start Date: 13/Jul/22 22:16
Worklog Time Spent: 10m
Work Description: cnauroth commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r920548705
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
Review Comment:
`shutdown` does not wait for the submitted tasks to finish, so when the
`close()` method returns, it won't really be guaranteed that closing has
completed. We'll need a call to `awaitTermination` to make sure all tasks have
finished running.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
Review Comment:
Is this line left over from some manual testing?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
Review Comment:
I suggest making this configurable, with 10 as the default.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
Review Comment:
We know that we will generate exactly one callable for each `RecordWriter`.
We can create the `ArrayList` pre-allocated to exactly the correct size,
potentially avoiding reallocation inefficiencies: `new
ArrayList<>(recordWriters.size())`
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
Review Comment:
You can log a warning here that closing was interrupted.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
Review Comment:
I recommend using the version of this method that accepts a `ThreadFactory`,
and probably use `ThreadFactoryBuilder`. The factory should generate threads
that 1) use a naming format that makes it clear these threads are related to
the closing process (e.g. "MultipleOutputs-close"), and 2) set an
`UncaughtExceptionHandler` that logs the exception, which would make visible
unexpected errors like unchecked exceptions.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
+ }
+
+ if (ioException.get() != null) {
+ throw new IOException(ioException.get());
Review Comment:
With this approach, if multiple record writers throw an exception during
close, we'll only get visibility into one of them. I'd like to suggest a
slightly different approach. Within the callable, catch the exception, log it
immediately and flag an `AtomicBoolean`. Then, on this line, if that
`AtomicBoolean` is set, throw an `IOException` from the overall method, with a
message like "One or more threads encountered IOException during close. See
prior errors."
Issue Time Tracking
-------------------
Worklog Id: (was: 790611)
Time Spent: 2h 10m (was: 2h)
> Parallelize MultipleOutputs#close call
> --------------------------------------
>
> Key: MAPREDUCE-7370
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Affects Versions: 3.3.0
> Reporter: Prabhu Joseph
> Assignee: Ashutosh Gupta
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> This call takes more time when there are lot of files to close and there is a
> high latency to close. Parallelize MultipleOutputs#close call to improve the
> speed.
> {code}
> public void close() throws IOException {
> for (RecordWriter writer : recordWriters.values()) {
> writer.close(null);
> }
> }
> {code}
> Idea is from [[email protected]]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]