[
https://issues.apache.org/jira/browse/MAPREDUCE-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
BELUGA BEHR updated MAPREDUCE-7057:
-----------------------------------
Status: Open (was: Patch Available)
> MergeThread Review
> ------------------
>
> Key: MAPREDUCE-7057
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7057
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Components: mrv2
> Affects Versions: 3.0.0
> Reporter: BELUGA BEHR
> Priority: Minor
> Attachments: MAPREDUCE-7057.1.patch, MAPREDUCE-7057.2.2.patch,
> MAPREDUCE-7057.2.patch, MAPREDUCE-7057.4.patch
>
>
> Source:
>
> [MergeThread.java|https://github.com/apache/hadoop/blob/178751ed8c9d47038acf8616c226f1f52e884feb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java]
> Update this class to use Java 1.8 concurrent package. There also some
> corner-cases not being addressed with the current implementation:
> {code:java|title=MergeThread.java}
> // There is a scenario here where N threads have submitted inputs and are all
> waiting for the 'pendingToBeMerged' object. At this point, imagine the
> 'close' method is called. The close method will run, see nothing in the
> queue, interrupt the processing thread, and cause it to exit. Afterwards,
> the 'startMerge' threads will all be triggered and add the inputs to a queue
> for which there is no consumer. At this point, the T items have been removed
> from the inputs with no way to recover them. In practice, this may not ever
> be the case, but it can be tightened up.
> public void startMerge(Set<T> inputs) {
> if (!closed) {
> numPending.incrementAndGet();
> List<T> toMergeInputs = new ArrayList<T>();
> Iterator<T> iter=inputs.iterator();
> for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
> toMergeInputs.add(iter.next());
> iter.remove();
> }
> LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() +
> " segments, while ignoring " + inputs.size() + " segments");
> synchronized(pendingToBeMerged) {
> pendingToBeMerged.addLast(toMergeInputs);
> pendingToBeMerged.notifyAll();
> }
> }
> }
> public synchronized void close() throws InterruptedException {
> closed = true;
> waitForMerge();
> interrupt();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]