[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

smarthan updated MAPREDUCE-7074:
--------------------------------
    Description: 
When a MR job like this:
 - MR job with many map tasks, such as 10000 or more
 - a few map output were lost or corrupted after map task complete successfully 
and before shuffle start
 - mapreduce.task.timeout was set to 0 and 
mapreduce.task.progress-report.interval was not set

the shuffle of reduce task will get stuck in fetch failures loop for a long 
time, several or even dozens of hours.

It was caused by MAPREDUCE-6740, it releate mapreduce.task.timeout with 
mapreduce.task.progress-report.interval by 
MRJobConfUtil.getTaskProgressReportInterval()
{code:java}
  public static long getTaskProgressReportInterval(final Configuration conf) {
    long taskHeartbeatTimeOut = conf.getLong(
        MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
    return conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL,
        (long) (TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO * taskHeartbeatTimeOut));
  }
{code}
When mapreduce.task.timeout was set to 0 and 
mapreduce.task.progress-report.interval was not set, 
getTaskProgressReportInterval will retrun 0L.
 In the class TaskReporter which is used to report task progress and status to 
AM, it set taskProgressInterval= MRJobConfUtil.getTaskProgressReportInterval(), 
and lock.wait(taskProgressInterval) before every progress report.
{code:java}
 public void run() {
      ...skip...
      long taskProgressInterval = MRJobConfUtil.
          getTaskProgressReportInterval(conf);
      while (!taskDone.get()) {
        ...skip...
        try {
          // sleep for a bit
          synchronized(lock) {
            if (taskDone.get()) {
              break;
            }
            lock.wait(taskProgressInterval);
          }
          if (taskDone.get()) {
            break;
          }
          if (sendProgress) {
            // we need to send progress update
            updateCounters();
            taskStatus.statusUpdate(taskProgress.get(),
                                    taskProgress.toString(), 
                                    counters);
            taskFound = umbilical.statusUpdate(taskId, taskStatus);
            taskStatus.clearStatus();
          }
          ...skip...
        } 
        ...skip...
      }
   }
{code}
When mapreduce.task.timeout was set to 0, lock.wait(taskProgressInterval) will 
be lock.wait(0), and because there is no operation to notify it ,the reporter 
will wait all the time and don't report anything to AM. 
 So, when fetch failures happend in shuffle, TaskReporter will not report fetch 
failures to AM , although the log of reducer show message"Reporting fetch 
failure...", and the fetch failures loop will not stop util reduce task failed 
for exceeded MAX_FAILED_UNIQUE_FETCHES.

So, it's necessary to set a TASK_PROGRESS_REPORT_INTERVAL_MAX value (such as 
30s) when the taskProgressInterval return by 
MRJobConfUtil.getTaskProgressReportInterval() equals 0 or beyond the max value, 
set the taskProgressInterval = TASK_PROGRESS_REPORT_INTERVAL_MAX. 

  was:
When a MR job like this:
 - MR job with many map tasks, such as 10000 or more
 - a few map output were lost or corrupted after map task complete successfully 
and before shuffle start
 - mapreduce.task.timeout was set to 0 and 
mapreduce.task.progress-report.interval was not set

the shuffle of reduce task will get stuck in fetch failures loop for a long 
time, several or even dozens of hours.

It was caused by MAPREDUCE-6740, it releate mapreduce.task.timeout with 
mapreduce.task.progress-report.interval by 
MRJobConfUtil.getTaskProgressReportInterval()
{code:java}
  public static long getTaskProgressReportInterval(final Configuration conf) {
    long taskHeartbeatTimeOut = conf.getLong(
        MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
    return conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL,
        (long) (TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO * taskHeartbeatTimeOut));
  }
{code}
When mapreduce.task.timeout was set to 0 and 
mapreduce.task.progress-report.interval was not set, 
getTaskProgressReportInterval will retrun 0L.
 In the class TaskReporter which is used to report task progress and status to 
AM, it set taskProgressInterval= MRJobConfUtil.getTaskProgressReportInterval(), 
and lock.wait(taskProgressInterval) before every progress report.
{code:java}
 public void run() {
      ...skip...
      long taskProgressInterval = MRJobConfUtil.
          getTaskProgressReportInterval(conf);
      while (!taskDone.get()) {
        ...skip...
        try {
          // sleep for a bit
          synchronized(lock) {
            if (taskDone.get()) {
              break;
            }
            lock.wait(taskProgressInterval);
          }
          if (taskDone.get()) {
            break;
          }
          if (sendProgress) {
            // we need to send progress update
            updateCounters();
            taskStatus.statusUpdate(taskProgress.get(),
                                    taskProgress.toString(), 
                                    counters);
            taskFound = umbilical.statusUpdate(taskId, taskStatus);
            taskStatus.clearStatus();
          }
          ...skip...
        } 
        ...skip...
      }
   }
{code}
When mapreduce.task.timeout was set to 0, lock.wait(taskProgressInterval) will 
be lock.wait(0), and because there is no operation to notify it ,the reporter 
will wait all the time and don't report anything to AM. 
 So, when fetch failures happend in shuffle, TaskReporter will not report fetch 
failures to AM , although the log of reducer show message"Reporting fetch 
failure...", and the fetch failures loop will not stop util reduce task failed 
for exceeded MAX_FAILED_UNIQUE_FETCHES.

So, it's necessary to set a TASK_PROGRESS_REPORT_INTERVAL_MAX value (such as 
30s) when the taskProgressInterval return by 
MRJobConfUtil.getTaskProgressReportInterval() equals 0 or beyond the max value, 
set the taskProgressInterval = TASK_PROGRESS_REPORT_INTERVAL_MAX.

Exception Message:

2018-04-09 14:57:08,610 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.Fetcher: for 
url=6562/mapOutput?job=job_1523233330039_13196&reduce=0&map=attempt_1523233330039_13196_m_003652_0,attempt_1523233330039_13196_m_001331_0,attempt_1523233330039_13196_m_000342_0,attempt_1523233330039_13196_m_000105_0,attempt_1523233330039_13196_m_001211_0,attempt_1523233330039_13196_m_002219_0,attempt_1523233330039_13196_m_004747_0,attempt_1523233330039_13196_m_000062_0
 sent hash and received reply
2018-04-09 14:57:08,612 WARN [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.Fetcher: Invalid map id 
java.lang.IllegalArgumentException: TaskAttemptId string : TTP/1.1 500 Internal 
Server Error
Content-Type: text/plain; charset=UTF is not properly formed
        at 
org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
        at 
org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:510)
        at 
org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:348)
        at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:198)
2018-04-09 14:57:08,612 WARN [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.Fetcher: copyMapOutput failed for tasks 
[attempt_1523233330039_13196_m_003652_0, 
attempt_1523233330039_13196_m_001331_0, attempt_1523233330039_13196_m_000342_0, 
attempt_1523233330039_13196_m_000105_0, attempt_1523233330039_13196_m_001211_0, 
attempt_1523233330039_13196_m_002219_0, attempt_1523233330039_13196_m_004747_0, 
attempt_1523233330039_13196_m_000062_0]
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_003652_0 to jobtracker.
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_001331_0 to jobtracker.
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_000342_0 to jobtracker.
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_000105_0 to jobtracker.
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_001211_0 to jobtracker.
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_002219_0 to jobtracker.
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_004747_0 to jobtracker.
2018-04-09 14:57:08,612 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_000062_0 to jobtracker.
...
2018-04-09 22:51:02,142 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.Fetcher: for 
url=6562/mapOutput?job=job_1523233330039_13196&reduce=0&map=attempt_1523233330039_13196_m_003652_0,attempt_1523233330039_13196_m_001331_0,attempt_1523233330039_13196_m_000342_0,attempt_1523233330039_13196_m_000105_0,attempt_1523233330039_13196_m_001211_0,attempt_1523233330039_13196_m_002219_0,attempt_1523233330039_13196_m_004747_0,attempt_1523233330039_13196_m_000062_0
 sent hash and received reply
2018-04-09 22:51:02,154 WARN [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.Fetcher: Invalid map id 
java.lang.IllegalArgumentException: TaskAttemptId string : TTP/1.1 500 Internal 
Server Error
Content-Type: text/plain; charset=UTF is not properly formed
  at org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
  at 
org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:510)
  at 
org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:348)
  at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:198)
2018-04-09 22:51:02,156 WARN [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.Fetcher: copyMapOutput failed for tasks 
[attempt_1523233330039_13196_m_003652_0, 
attempt_1523233330039_13196_m_001331_0, attempt_1523233330039_13196_m_000342_0, 
attempt_1523233330039_13196_m_000105_0, attempt_1523233330039_13196_m_001211_0, 
attempt_1523233330039_13196_m_002219_0, attempt_1523233330039_13196_m_004747_0, 
attempt_1523233330039_13196_m_000062_0]
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_003652_0 to jobtracker.
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_001331_0 to jobtracker.
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_000342_0 to jobtracker.
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_000105_0 to jobtracker.
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_001211_0 to jobtracker.
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_002219_0 to jobtracker.
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_004747_0 to jobtracker.
2018-04-09 22:51:02,157 INFO [fetcher#3] 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl: Reporting fetch 
failure for attempt_1523233330039_13196_m_000062_0 to jobtracker.


 


> Shuffle  get stuck in fetch failures loop, when a few mapoutput were lost or 
> corrupted and task timeout was set to 0
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-7074
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7074
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mrv2, task
>    Affects Versions: 2.8.0, 3.0.0
>         Environment: cdh 5.10.0 ,  apache hadoop 2.8.0
>            Reporter: smarthan
>            Priority: Major
>             Fix For: 2.8.0
>
>
> When a MR job like this:
>  - MR job with many map tasks, such as 10000 or more
>  - a few map output were lost or corrupted after map task complete 
> successfully and before shuffle start
>  - mapreduce.task.timeout was set to 0 and 
> mapreduce.task.progress-report.interval was not set
> the shuffle of reduce task will get stuck in fetch failures loop for a long 
> time, several or even dozens of hours.
> It was caused by MAPREDUCE-6740, it releate mapreduce.task.timeout with 
> mapreduce.task.progress-report.interval by 
> MRJobConfUtil.getTaskProgressReportInterval()
> {code:java}
>   public static long getTaskProgressReportInterval(final Configuration conf) {
>     long taskHeartbeatTimeOut = conf.getLong(
>         MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
>     return conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL,
>         (long) (TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO * 
> taskHeartbeatTimeOut));
>   }
> {code}
> When mapreduce.task.timeout was set to 0 and 
> mapreduce.task.progress-report.interval was not set, 
> getTaskProgressReportInterval will retrun 0L.
>  In the class TaskReporter which is used to report task progress and status 
> to AM, it set taskProgressInterval= 
> MRJobConfUtil.getTaskProgressReportInterval(), and 
> lock.wait(taskProgressInterval) before every progress report.
> {code:java}
>  public void run() {
>       ...skip...
>       long taskProgressInterval = MRJobConfUtil.
>           getTaskProgressReportInterval(conf);
>       while (!taskDone.get()) {
>         ...skip...
>         try {
>           // sleep for a bit
>           synchronized(lock) {
>             if (taskDone.get()) {
>               break;
>             }
>             lock.wait(taskProgressInterval);
>           }
>           if (taskDone.get()) {
>             break;
>           }
>           if (sendProgress) {
>             // we need to send progress update
>             updateCounters();
>             taskStatus.statusUpdate(taskProgress.get(),
>                                     taskProgress.toString(), 
>                                     counters);
>             taskFound = umbilical.statusUpdate(taskId, taskStatus);
>             taskStatus.clearStatus();
>           }
>           ...skip...
>         } 
>         ...skip...
>       }
>    }
> {code}
> When mapreduce.task.timeout was set to 0, lock.wait(taskProgressInterval) 
> will be lock.wait(0), and because there is no operation to notify it ,the 
> reporter will wait all the time and don't report anything to AM. 
>  So, when fetch failures happend in shuffle, TaskReporter will not report 
> fetch failures to AM , although the log of reducer show message"Reporting 
> fetch failure...", and the fetch failures loop will not stop util reduce task 
> failed for exceeded MAX_FAILED_UNIQUE_FETCHES.
> So, it's necessary to set a TASK_PROGRESS_REPORT_INTERVAL_MAX value (such as 
> 30s) when the taskProgressInterval return by 
> MRJobConfUtil.getTaskProgressReportInterval() equals 0 or beyond the max 
> value, set the taskProgressInterval = TASK_PROGRESS_REPORT_INTERVAL_MAX. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to