[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13664720#comment-13664720
 ] 

Carlo Curino commented on MAPREDUCE-5269:
-----------------------------------------

This patch completes the series 
MAPREDUCE-5176,MAPREDUCE-5189,MAPREDUCE-5192,MAPREDUCE-5194,MAPREDUCE-5196,MAPREDUCE-5197
 and provides the actual checkpoint mechanics for shuffle and reducers. In Task 
we propagate the requests coming in form the umbilical protocol to preempt the 
task and propagate them into the reducer and shuffle logic.

--- Saving the computation state to checkpoint ---
For Shuffle we leverage the plugin architecture to provide a 
PreemptableShuffle. The key intuition behind this code is to: 
# bring the fetcher threads to a safe halt (based on MAPREDUCE-5194), 
# take note of what are the map that have been pulled so far
# finish the local sort of such maps
# write (using MAPREDUCE-5197) a checkpoint file that contains a header with 
the list of map ids, and the sorted K,V pairs. This is a modified IFile.

For Reducers we check that the Reducer and OutputCommitter are tagged as 
@Preemptable by the user (see discussion on MAPREDUCE-5176) and:
# wait for the execution of the user UDF to complete for a key-group
# promote the output produced so far, so that is not removed during cleanup 
(this leverages a modified version of FileOutpuCommitter: the 
PartialOutputCommitter)
# write (using MAPREDUCE-5197) in a checkpoint the remaining portion of the 
data to be processed (same format as for shuffle, where the header contains all 
the map ids, and the K,V pairs store the data not reducerd yet)

--- Restarting from a checkpoint ---
For both Shuffle and Reducer checkpoints we simply:
# during the init of the PreemptableShuffle we check on whether there is a 
checkpoint for this task
# if one exists we initialized the list of finishedMaps in the shuffle 
scheduler to what has been saved in the checkpoint
# we add the IFile contained in the checkpoint to the set of files we are 
shuffling
# proceed with normal shuffle

--- Failures ---
We detect problems with the checkpoints during restart, and if the checkpoint 
is corrupted/unavailable or the partially committed output is missing
we completely reset the execution for this task by wiping the partially 
committed output and restarting from scratch the execution of the task 
(basically
we fallback to a classic task re-execution). Beside unit tests, and regular 
runs, we validated this recovery mechanism by injecting faults (missed 
propagations
of checkpoint ids, missing checkpoint files, missing output files) and observed 
the system behaving properly. 

(please direct conversations around when to tag a reducer as @Preemptable and 
how to support stateful reducers to MAPREDUCE-5176)


                
> Preemption of Reducer (and Shuffle) via checkpointing
> -----------------------------------------------------
>
>                 Key: MAPREDUCE-5269
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5269
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: mrv2
>            Reporter: Carlo Curino
>         Attachments: MAPREDUCE-5269.patch
>
>
> This patch tracks the changes in the task runtime (shuffle, reducer context, 
> etc.) that are required to implement checkpoint-based preemption of reducer 
> tasks.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to