[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BELUGA BEHR updated MAPREDUCE-7057:
-----------------------------------
    Attachment: MAPREDUCE-7057.2.2.patch

> MergeThread Review
> ------------------
>
>                 Key: MAPREDUCE-7057
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7057
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: mrv2
>    Affects Versions: 3.0.0
>            Reporter: BELUGA BEHR
>            Priority: Minor
>         Attachments: MAPREDUCE-7057.1.patch, MAPREDUCE-7057.2.2.patch, 
> MAPREDUCE-7057.2.patch
>
>
> Source:
>  
> [MergeThread.java|https://github.com/apache/hadoop/blob/178751ed8c9d47038acf8616c226f1f52e884feb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java]
> Update this class to use Java 1.8 concurrent package.  There also some 
> corner-cases not being addressed with the current implementation:
> {code:java|title=MergeThread.java}
> // There is a scenario here where N threads have submitted inputs and are all 
> waiting for the 'pendingToBeMerged' object.  At this point, imagine the 
> 'close' method is called.  The close method will run, see nothing in the 
> queue, interrupt the processing thread, and cause it to exit.  Afterwards, 
> the 'startMerge' threads will all be triggered and add the inputs to a queue 
> for which there is no consumer.  At this point, the T items have been removed 
> from the inputs with no way to recover them.  In practice, this may not ever 
> be the case, but it can be tightened up.
>   public void startMerge(Set<T> inputs) {
>     if (!closed) {
>       numPending.incrementAndGet();
>       List<T> toMergeInputs = new ArrayList<T>();
>       Iterator<T> iter=inputs.iterator();
>       for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
>         toMergeInputs.add(iter.next());
>         iter.remove();
>       }
>       LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + 
>                " segments, while ignoring " + inputs.size() + " segments");
>       synchronized(pendingToBeMerged) {
>         pendingToBeMerged.addLast(toMergeInputs);
>         pendingToBeMerged.notifyAll();
>       }
>     }
>   }
>   public synchronized void close() throws InterruptedException {
>     closed = true;
>     waitForMerge();
>     interrupt();
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to