NING DING created MAPREDUCE-6920:
------------------------------------

             Summary: Cannot find the effect of 
mapreduce.job.speculative.slowtaskthreshold parameter
                 Key: MAPREDUCE-6920
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6920
             Project: Hadoop Map/Reduce
          Issue Type: Bug
    Affects Versions: 2.7.1
            Reporter: NING DING
            Priority: Minor


The description of parameter mapreduce.job.speculative.slowtaskthreshold is as 
below.
{code:xml}
<property>
  <name>mapreduce.job.speculative.slowtaskthreshold</name>
  <value>1.0</value>
  <description>The number of standard deviations by which a task's
  ave progress-rates must be lower than the average of all running tasks'
  for the task to be considered too slow.
  </description>
</property>
{code}

But from the source code I find it has no effect for starting speculative task.
The call stack is as below.
DefaultSpeculator.speculationValue -> StartEndTimesBase.thresholdRuntime -> 
DataStatistics.outlier
{code:title=DefaultSpeculator.java|borderStyle=solid}
  private TaskItem speculationValue(TaskId taskID, long now) {
    TaskItem taskItem = new TaskItem();
    Job job = context.getJob(taskID.getJobId());
    Task task = job.getTask(taskID);
    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
    long acceptableRuntime = Long.MIN_VALUE;
    long speculationValue = Long.MIN_VALUE;

    if (!mayHaveSpeculated.contains(taskID)) {
      acceptableRuntime = estimator.thresholdRuntime(taskID);
      if (acceptableRuntime == Long.MAX_VALUE) {
        taskItem.setSpeculationValue(ON_SCHEDULE);
        return taskItem;
      }
    }
   ...
  }
{code}
{code:title=StartEndTimesBase.java|borderStyle=solid}
  public long thresholdRuntime(TaskId taskID) {
    JobId jobID = taskID.getJobId();
    Job job = context.getJob(jobID);

    TaskType type = taskID.getTaskType();

    DataStatistics statistics
        = dataStatisticsForTask(taskID);

    int completedTasksOfType
        = type == TaskType.MAP
            ? job.getCompletedMaps() : job.getCompletedReduces();

    int totalTasksOfType
        = type == TaskType.MAP
            ? job.getTotalMaps() : job.getTotalReduces();

    if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
        || (((float)completedTasksOfType) / totalTasksOfType)
              < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
      return Long.MAX_VALUE;
    }

    long result =  statistics == null
        ? Long.MAX_VALUE
        : (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
    return result;
  }
{code}
{code:title=DataStatistics.java|borderStyle=solid}
  public synchronized double outlier(float sigma) {
    if (count != 0.0) {
      return mean() + std() * sigma;
    }

    return 0.0;
  }
{code}

The StartEndTimesBase.contextualize read 
mapreduce.job.speculative.slowtaskthreshold parameter value, then use it as 
outlier method parameter sigma value.
{code:title=StartEndTimesBase.java|borderStyle=solid}
  public void contextualize(Configuration conf, AppContext context) {
    this.context = context;

    Map<JobId, Job> allJobs = context.getAllJobs();

    for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
      final Job job = entry.getValue();
      mapperStatistics.put(job, new DataStatistics());
      reducerStatistics.put(job, new DataStatistics());
      slowTaskRelativeTresholds.put
          (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
    }
  }
{code}

I think the outlier return value is hard to be Long.MAX_VALUE no matter what 
the mapreduce.job.speculative.slowtaskthreshold parameter value is.
Then it cannot affect the return value of DefaultSpeculator.speculationValue 
method.
Then I run a test for this parameter. Test source code is as below.

{code:title=TestSpeculativeTask.java|borderStyle=solid}
package test.speculation;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestSpeculativeTask extends Configured implements Tool {

  public TestSpeculativeTask() {

  }

  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf);
    FileInputFormat.setInputPaths(job, args[0]);
    job.setMapperClass(SpeculativeTestMapper.class);
    job.setNumReduceTasks(0);
    job.setJarByClass(SpeculativeTestMapper.class);
    Path output = new Path(args[1]);
    FileOutputFormat.setOutputPath(job, output);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    job.waitForCompletion(true);
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res =
        ToolRunner.run(new Configuration(), new TestSpeculativeTask(), args);
    System.exit(res);
  }
}
{code}

{code:title=SpeculativeTestMapper.java|borderStyle=solid}
package test.speculation;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SpeculativeTestMapper<K> extends
    Mapper<K, Text, Text, NullWritable> {
  private Log log = LogFactory.getLog(this.getClass());
  public void map(K key, Text value, Context context) throws IOException,
      InterruptedException {
    String text = value.toString();
    log.info("processing " + text);
    try {
      Thread.sleep(60 * 1000);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    context.write(new Text(text), NullWritable.get());
  }
}
{code}
The input path has 10 files in hdfs. Only one file has 10 lines and other files 
has one line.
The mapper task processing that file has 10 lines must be slow and cause 
speculative task attempt.
The test result is the speculative task start time has no obviouse difference 
for mapreduce.job.speculative.slowtaskthreshold=1 and 
mapreduce.job.speculative.slowtaskthreshold=10.

If any one find the same issue. Or I misunderstand 
mapreduce.job.speculative.slowtaskthreshold parameter.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-dev-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-dev-h...@hadoop.apache.org

Reply via email to