[
https://issues.apache.org/jira/browse/MAPREDUCE-2454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13027411#comment-13027411
]
Mariappan Asokan commented on MAPREDUCE-2454:
---------------------------------------------
I thought more on the implementation. Here is what I came up with the steps
involved. I can create one Jira per each step. If there is any objection from
anyone, I would like to hear about it before I jump in. In the following, any
reference to 'framework code' implies current code in Map/Reduce in Hadoop
taken at the branch MAPREDUCE-279.
# Modify the framework code so that RawKeyValueIterator is visible only within
the sort/merge related code. All others will see the new KeyValueIterator as
defined below:
{code:title=KeyValueIterator.java|borderStyle=solid}
public interface KeyValueIterator<K, V> extends Closeable {
/**
* Get the current key.
* @param key where the current key should be stored. If this is null, a new
* instance will be created and returned.
* @return current key
* @exception IOException in case of error.
* @exception InterruptedException in case of interruption.
*/
K getCurrentKey(K key) throws IOException, InterruptedException;
/**
* Get the current key.
* @return current key
* @exception IOException in case of error.
* @exception InterruptedException in case of interruption.
*/
K getCurrentKey() throws IOException, InterruptedException;
/**
* Get the current value.
* @param value where the current value should be stored. If this is null, a
* new instance will be created and returned.
* @return current value.
* @exception IOException in case of error.
* @exception InterruptedException in case of interruption.
*/
V getCurrentValue(V value) throws IOException, InterruptedException;
/**
* Get the current value.
* @return current value.
* @exception IOException in case of error.
* @exception InterruptedException in case of interruption.
*/
V getCurrentValue() throws IOException, InterruptedException;
/**
* Set up to get the current key and value (for getKey() and getValue()).
* @return <code>true</code> if there exists a key/value, <code>false</code>
* otherwise.
* @throws IOException in case of error.
* @exception InterruptedException in case of interruption.
*/
boolean nextKeyValue() throws IOException, InterruptedException;
/**
* Get the Progress object; this has a float (0.0 - 1.0) indicating the bytes
* processed by the iterator so far.
* @return progress object.
*/
Progress getProgress();
}
{code}
This will enable any external sorter implementation to reuse existing code in
Task.java to run the combiner and the code in ReduceTask.java to run the
reducer.
# Modify the framework code so that an external sorter can be plugged on the
Map side.
# Modify the shuffle code on the Reduce side so that a shuffle can be started
by any code outside MR(perhaps in a separate thread.) A callback interface
will be passed to shuffle. Tentatively, the callback will look like this and
can be refined.
{code:title=ShuffleCallback.java|borderStyle=solid}
public interface ShuffleCallback<K, V> extends Closeable
/**
* To reserve space for the specified mapper output.
* @param mapId mapper id.
* @param requestedSize number of bytes in space to be reserved.
* @param fetcher id of fetcher that will fetch the map output.
* @exception IOException in case of error.
*/
public MapOutput<K,V> reserve(TaskAttemptID mapId, long requestedSize,
int fetcher)
throws IOException;
/**
* To shuffle the data from local mappers.
* @param localMapFiles array of map output files to be sorted.
* @return total number of bytes read from the mapper outputs.
* @exception IOException if there is any IO error while reading.
* @exception InterruptedException if there is an interruption.
* @exception InterruptedException in case of interruption.
* @exception ReduceInputSorterException any other exception that occurs while
* sorting.
*/
public long shuffle(Path localMapFiles[])
throws IOException, InterruptedException, ReduceInputSorterException;
/**
* To shuffle the raw data coming from a non-local mapper. Multiple threads
* can call this method with input from different mappers.
* @param inputFromMapper the raw input stream from the mapper. If map output
* is compressed, the sorter is responsible for decompressing.
* @param mapTaskId map task id corresponding to the stream.
* @param compressedLength The size of the compressed data.
* @return number of bytes read from the mapper stream.
* @exception IOException if there is any IO error while reading.
* @exception InterruptedException if there is an interruption.
* @exception ReduceInputSorterException any other exception that occurs in
* the sorter.
*/
public long shuffle(InputStream inputFromMapper, String mapTaskId,
long compressedLength)
throws IOException, InterruptedException, ReduceInputSorterException;
/**
* To commit shuffled data from a non-local mapper. Usually, this method is
* called right after shuffle() from the same thread once it is
* decided to commit.
* @param mapTaskId map task id corresponding to the shuffled data.
* @exception IOException if there is any IO error while discarding.
* @exception InterruptedException in case of interruption.
* @exception ReduceInputSorterException any other exception that occurs in
* the sorter.
*/
public void commit(String mapTaskId)
throws IOException, InterruptedException, ReduceInputSorterException;
/**
* To discard shuffled data from a non-local mapper. Usually, this method is
* called right after shuffle() from the same thread once it is
* decided to discard.
* @param mapTaskId map task id corresponding to the shuffled data.
* @exception IOException if there is any IO error while discarding.
* @exception InterruptedException in case of interruption.
* @exception ReduceInputSorterException any other exception that occurs in
* the sorter.
*/
public void discard(String mapTaskId)
throws IOException, InterruptedException, ReduceInputSorterException;
/**
* To indicate end of input from all non-local mappers. This should be called
* after all non-local mapper outputs are committed.
* @exception IOException if there is any IO error.
* @exception InterruptedException if there is an interruption.
* @exception ReduceInputSorterException any other exception that occurs in
* the sorter.
*/
public void close()
throws IOException, InterruptedException, ReduceInputSorterException;
{code}
# Modify ReduceTask.java so that it will invoke the framework's merge or the
external sorter.
# The external sorter interface on the Map side will look like:
{code:title=MapOutputSorter.java|borderStyle=solid}
public interface MapOutputSorter<K, V>
extends MapOutputCollector<K, V> {
/**
* To initialize the sorter.
* @param job job configuration.
* @param mapTask map task invoking this sorter.
* @param inputSplitSize size of input split processed by this sorter.
* @param mapOutputFile map output file
* @param reporter reporter to report sorter progress.
* @exception IOException if there is any error during initialization.
* @exception ClassNotFoundException if a class to be loaded is not found.
* @exception UnsupportedOperationException thrown by the sorter if it
* cannot support certain options in the job. For example, a sorter may
* support only a certain subset of key types. The default sorter in the
* framework will be used as a fallback when this exception is thrown.
*/
public void initialize(JobConf job, MapTask mapTask, long inputSplitSize,
MapOutputFile mapOutputFile, TaskReporter reporter)
throws IOException, ClassNotFoundException, UnsupportedOperationException;
}
{code}
The MapOutputCollector interface defined in MapTask.java will be made public
and will look like below:
{code:title=MapOutputCollector.java|borderStyle=solid}
public interface MapOutputCollector<K, V> extends Closeable{
public void collect(K key, V value, int partition
) throws IOException, InterruptedException;
public void flush() throws IOException, InterruptedException,
ClassNotFoundException;
public void close() throws IOException, InterruptedException;
}
{code}
On the Reduce side, the external sorter interface will look like:
{code:title=ReduceInputSorter.java|borderStyle=solid}
public interface ReduceInputSorter<K, V> extends KeyValueIterator<K, V> {
/**
* Initialize the sorter.
* @param job job configuration.
* @param reduceTask reduce task invoking this sorter.
* @param reporter reporter to report sorter progress.
* @exception IOException if there is any error during initialization.
* @exception ClassNotFoundException if a class to be loaded is not found.
* @exception UnsupportedOperationException thrown by the sorter if it
* cannot support certain options in the job. For example, a sorter may
* support only a certain subset of key types. The default sorter in the
* framework will be used as a fallback when this exception is thrown.
*/
public void initialize(JobConf job, ReduceTask reduceTask,
TaskReporter reporter)
throws IOException, ClassNotFoundException, UnsupportedOperationException;
{code}
# Some abstract base classes of the above two may be provided in the framework
to facilitate external sorter implementations.
# Provide a proof-of-concept implementation of an external sorter both on the
Map and Reduce sides using GNU sort command as the external sorter.
*All the changes mentioned above should not result in any performance
degradation of framework code when no external sorter is plugged in.*
> Allow external sorter plugin for MR
> -----------------------------------
>
> Key: MAPREDUCE-2454
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-2454
> Project: Hadoop Map/Reduce
> Issue Type: New Feature
> Reporter: Mariappan Asokan
> Priority: Minor
> Attachments: KeyValueIterator.java, MapOutputSorter.java,
> MapOutputSorterAbstract.java, ReduceInputSorter.java
>
>
> Define interfaces and some abstract classes in the Hadoop framework to
> facilitate external sorter plugins both on the Map and Reduce sides.
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira