NING DING created MAPREDUCE-6920:
------------------------------------
Summary: Cannot find the effect of
mapreduce.job.speculative.slowtaskthreshold parameter
Key: MAPREDUCE-6920
URL: https://issues.apache.org/jira/browse/MAPREDUCE-6920
Project: Hadoop Map/Reduce
Issue Type: Bug
Affects Versions: 2.7.1
Reporter: NING DING
Priority: Minor
The description of parameter mapreduce.job.speculative.slowtaskthreshold is as
below.
{code:xml}
<property>
<name>mapreduce.job.speculative.slowtaskthreshold</name>
<value>1.0</value>
<description>The number of standard deviations by which a task's
ave progress-rates must be lower than the average of all running tasks'
for the task to be considered too slow.
</description>
</property>
{code}
But from the source code I find it has no effect for starting speculative task.
The call stack is as below.
DefaultSpeculator.speculationValue -> StartEndTimesBase.thresholdRuntime ->
DataStatistics.outlier
{code:title=DefaultSpeculator.java|borderStyle=solid}
private TaskItem speculationValue(TaskId taskID, long now) {
TaskItem taskItem = new TaskItem();
Job job = context.getJob(taskID.getJobId());
Task task = job.getTask(taskID);
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
long acceptableRuntime = Long.MIN_VALUE;
long speculationValue = Long.MIN_VALUE;
if (!mayHaveSpeculated.contains(taskID)) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
taskItem.setSpeculationValue(ON_SCHEDULE);
return taskItem;
}
}
...
}
{code}
{code:title=StartEndTimesBase.java|borderStyle=solid}
public long thresholdRuntime(TaskId taskID) {
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
TaskType type = taskID.getTaskType();
DataStatistics statistics
= dataStatisticsForTask(taskID);
int completedTasksOfType
= type == TaskType.MAP
? job.getCompletedMaps() : job.getCompletedReduces();
int totalTasksOfType
= type == TaskType.MAP
? job.getTotalMaps() : job.getTotalReduces();
if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
|| (((float)completedTasksOfType) / totalTasksOfType)
< MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
return Long.MAX_VALUE;
}
long result = statistics == null
? Long.MAX_VALUE
: (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
return result;
}
{code}
{code:title=DataStatistics.java|borderStyle=solid}
public synchronized double outlier(float sigma) {
if (count != 0.0) {
return mean() + std() * sigma;
}
return 0.0;
}
{code}
The StartEndTimesBase.contextualize read
mapreduce.job.speculative.slowtaskthreshold parameter value, then use it as
outlier method parameter sigma value.
{code:title=StartEndTimesBase.java|borderStyle=solid}
public void contextualize(Configuration conf, AppContext context) {
this.context = context;
Map<JobId, Job> allJobs = context.getAllJobs();
for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
final Job job = entry.getValue();
mapperStatistics.put(job, new DataStatistics());
reducerStatistics.put(job, new DataStatistics());
slowTaskRelativeTresholds.put
(job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
}
}
{code}
I think the outlier return value is hard to be Long.MAX_VALUE no matter what
the mapreduce.job.speculative.slowtaskthreshold parameter value is.
Then it cannot affect the return value of DefaultSpeculator.speculationValue
method.
Then I run a test for this parameter. Test source code is as below.
{code:title=TestSpeculativeTask.java|borderStyle=solid}
package test.speculation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TestSpeculativeTask extends Configured implements Tool {
public TestSpeculativeTask() {
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, args[0]);
job.setMapperClass(SpeculativeTestMapper.class);
job.setNumReduceTasks(0);
job.setJarByClass(SpeculativeTestMapper.class);
Path output = new Path(args[1]);
FileOutputFormat.setOutputPath(job, output);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res =
ToolRunner.run(new Configuration(), new TestSpeculativeTask(), args);
System.exit(res);
}
}
{code}
{code:title=SpeculativeTestMapper.java|borderStyle=solid}
package test.speculation;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SpeculativeTestMapper<K> extends
Mapper<K, Text, Text, NullWritable> {
private Log log = LogFactory.getLog(this.getClass());
public void map(K key, Text value, Context context) throws IOException,
InterruptedException {
String text = value.toString();
log.info("processing " + text);
try {
Thread.sleep(60 * 1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
context.write(new Text(text), NullWritable.get());
}
}
{code}
The input path has 10 files in hdfs. Only one file has 10 lines and other files
has one line.
The mapper task processing that file has 10 lines must be slow and cause
speculative task attempt.
The test result is the speculative task start time has no obviouse difference
for mapreduce.job.speculative.slowtaskthreshold=1 and
mapreduce.job.speculative.slowtaskthreshold=10.
If any one find the same issue. Or I misunderstand
mapreduce.job.speculative.slowtaskthreshold parameter.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]