[
https://issues.apache.org/jira/browse/MAPREDUCE-6920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101869#comment-16101869
]
Jason Lowe commented on MAPREDUCE-6920:
---------------------------------------
I agree that mapreduce.job.speculative.slowtaskthreshold is essentially
useless. It will have very little effect in practice unless set to a very
large value and the standard deviation of completed task durations is
significant. And even then, all it would do is serve to disable speculation
which is of dubious utility.
If I were to guess it looked like someone originally intended for this property
to control the speculation trigger threshold via scaling the standard deviation
of task completion times. If an estimated running task duration were to be
below that threshold then we would never speculate it until the estimate
changed to exceed that threshold. But that's not what it does in practice.
Essentially thresholdRuntime is returning a boolean value, via Long.MAX_VALUE
or otherwise, that a task should or should not be speculated, so calculating a
realistic threshold is a waste of time.
If my guess is correct, I'm not sure that check would actually matter in
practice if performed. We already do not speculate a task if we expect the
existing task to complete before a newly scheduled task would complete, and
usually the existing task already has a significant head start.
> Cannot find the effect of mapreduce.job.speculative.slowtaskthreshold
> parameter
> -------------------------------------------------------------------------------
>
> Key: MAPREDUCE-6920
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6920
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Affects Versions: 2.7.1
> Reporter: NING DING
> Priority: Minor
>
> The description of parameter mapreduce.job.speculative.slowtaskthreshold is
> as below.
> {code:xml}
> <property>
> <name>mapreduce.job.speculative.slowtaskthreshold</name>
> <value>1.0</value>
> <description>The number of standard deviations by which a task's
> ave progress-rates must be lower than the average of all running tasks'
> for the task to be considered too slow.
> </description>
> </property>
> {code}
> But from the source code I find it has no effect for starting speculative
> task.
> The call stack is as below.
> DefaultSpeculator.speculationValue -> StartEndTimesBase.thresholdRuntime ->
> DataStatistics.outlier
> {code:title=DefaultSpeculator.java|borderStyle=solid}
> private TaskItem speculationValue(TaskId taskID, long now) {
> TaskItem taskItem = new TaskItem();
> Job job = context.getJob(taskID.getJobId());
> Task task = job.getTask(taskID);
> Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
> long acceptableRuntime = Long.MIN_VALUE;
> long speculationValue = Long.MIN_VALUE;
> if (!mayHaveSpeculated.contains(taskID)) {
> acceptableRuntime = estimator.thresholdRuntime(taskID);
> if (acceptableRuntime == Long.MAX_VALUE) {
> taskItem.setSpeculationValue(ON_SCHEDULE);
> return taskItem;
> }
> }
> ...
> }
> {code}
> {code:title=StartEndTimesBase.java|borderStyle=solid}
> public long thresholdRuntime(TaskId taskID) {
> JobId jobID = taskID.getJobId();
> Job job = context.getJob(jobID);
> TaskType type = taskID.getTaskType();
> DataStatistics statistics
> = dataStatisticsForTask(taskID);
> int completedTasksOfType
> = type == TaskType.MAP
> ? job.getCompletedMaps() : job.getCompletedReduces();
> int totalTasksOfType
> = type == TaskType.MAP
> ? job.getTotalMaps() : job.getTotalReduces();
> if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
> || (((float)completedTasksOfType) / totalTasksOfType)
> < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
> return Long.MAX_VALUE;
> }
> long result = statistics == null
> ? Long.MAX_VALUE
> : (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
> return result;
> }
> {code}
> {code:title=DataStatistics.java|borderStyle=solid}
> public synchronized double outlier(float sigma) {
> if (count != 0.0) {
> return mean() + std() * sigma;
> }
> return 0.0;
> }
> {code}
> The StartEndTimesBase.contextualize read
> mapreduce.job.speculative.slowtaskthreshold parameter value, then use it as
> outlier method parameter sigma value.
> {code:title=StartEndTimesBase.java|borderStyle=solid}
> public void contextualize(Configuration conf, AppContext context) {
> this.context = context;
> Map<JobId, Job> allJobs = context.getAllJobs();
> for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
> final Job job = entry.getValue();
> mapperStatistics.put(job, new DataStatistics());
> reducerStatistics.put(job, new DataStatistics());
> slowTaskRelativeTresholds.put
> (job,
> conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
> }
> }
> {code}
> I think the outlier return value is hard to be Long.MAX_VALUE no matter what
> the mapreduce.job.speculative.slowtaskthreshold parameter value is.
> Then it cannot affect the return value of DefaultSpeculator.speculationValue
> method.
> Then I run a test for this parameter. Test source code is as below.
> {code:title=TestSpeculativeTask.java|borderStyle=solid}
> package test.speculation;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
> public class TestSpeculativeTask extends Configured implements Tool {
> public TestSpeculativeTask() {
> }
> public int run(String[] args) throws Exception {
> Configuration conf = getConf();
> Job job = Job.getInstance(conf);
> FileInputFormat.setInputPaths(job, args[0]);
> job.setMapperClass(SpeculativeTestMapper.class);
> job.setNumReduceTasks(0);
> job.setJarByClass(SpeculativeTestMapper.class);
> Path output = new Path(args[1]);
> FileOutputFormat.setOutputPath(job, output);
> job.setOutputFormatClass(TextOutputFormat.class);
> job.setOutputKeyClass(Text.class);
> job.setOutputValueClass(NullWritable.class);
> job.waitForCompletion(true);
> return 0;
> }
> public static void main(String[] args) throws Exception {
> int res =
> ToolRunner.run(new Configuration(), new TestSpeculativeTask(), args);
> System.exit(res);
> }
> }
> {code}
> {code:title=SpeculativeTestMapper.java|borderStyle=solid}
> package test.speculation;
> import java.io.IOException;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Mapper;
> public class SpeculativeTestMapper<K> extends
> Mapper<K, Text, Text, NullWritable> {
> private Log log = LogFactory.getLog(this.getClass());
> public void map(K key, Text value, Context context) throws IOException,
> InterruptedException {
> String text = value.toString();
> log.info("processing " + text);
> try {
> Thread.sleep(60 * 1000);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> context.write(new Text(text), NullWritable.get());
> }
> }
> {code}
> The input path has 10 files in hdfs. Only one file has 10 lines and other
> files has one line.
> The mapper task processing that file has 10 lines must be slow and cause
> speculative task attempt.
> The test result is the speculative task start time has no obviouse difference
> for mapreduce.job.speculative.slowtaskthreshold=1 and
> mapreduce.job.speculative.slowtaskthreshold=10.
> If any one find the same issue. Or I misunderstand
> mapreduce.job.speculative.slowtaskthreshold parameter.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]