[
https://issues.apache.org/jira/browse/MAPREDUCE-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhihai xu updated MAPREDUCE-6259:
---------------------------------
Description:
-1 job submit time cause IllegalArgumentException when parse the Job history
file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
We found the following job history file name which cause
IllegalArgumentException when parse the job status in the job history file name.
{code}
job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
{code}
The stack trace for the IllegalArgumentException is
{code}
2015-02-10 04:54:01,863 WARN org.apache.hadoop.mapreduce.v2.hs.PartialJob:
Exception while parsing job state. Defaulting to KILLED
java.lang.IllegalArgumentException: No enum constant
org.apache.hadoop.mapreduce.v2.api.records.JobState.0
at java.lang.Enum.valueOf(Enum.java:236)
at
org.apache.hadoop.mapreduce.v2.api.records.JobState.valueOf(JobState.java:21)
at
org.apache.hadoop.mapreduce.v2.hs.PartialJob.getState(PartialJob.java:82)
at
org.apache.hadoop.mapreduce.v2.hs.PartialJob.<init>(PartialJob.java:59)
at
org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getAllPartialJobs(CachedHistoryStorage.java:159)
at
org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getPartialJobs(CachedHistoryStorage.java:173)
at
org.apache.hadoop.mapreduce.v2.hs.JobHistory.getPartialJobs(JobHistory.java:284)
at
org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebServices.getJobs(HsWebServices.java:212)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
at
com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185)
at
com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
at
com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:288)
at
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at
com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
at
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at
com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
at
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1469)
at
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1400)
at
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349)
at
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339)
at
com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416)
at
com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:886)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
at
com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
at
com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
at
com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
at
org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
at
org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
at
org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
at
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
at org.mortbay.jetty.Server.handle(Server.java:326)
at
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
at
org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
at
org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
{code}
when IOException happened in JobImpl#setup, the Job submit time in
JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job
submit time will be its [initial value
-1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
{code}
this.jobIndexInfo =
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
queueName);
{code}
The following is the sequences to get -1 job submit time:
1.
a job is created at MRAppMaster#serviceStart and the new job is at state
JobStateInternal.NEW after created
{code}
job = createJob(getConfig(), forcedState, shutDownMessage);
{code}
2.
JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
{code}
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
// Send init to the job (this does NOT trigger job execution)
// This is a synchronous call, not an event through dispatcher. We want
// job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent);
{code}
3.
after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
{code}
.addTransition
(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
JobEventType.JOB_INIT,
new InitTransition())
{code}
4.
then the exception happen from setup(job) in InitTransition#transition before
JobSubmittedEvent is handled.
JobSubmittedEvent will update the job submit time. Due to the exception, the
submit time is still the initial value -1.
This is the code InitTransition#transition
{code}
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf,
job.oldJobId);
}
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
job.makeUberDecision(inputLength);
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
{code}
This is the code JobImpl#setup
{code}
protected void setup(JobImpl job) throws IOException {
String oldJobIDString = job.oldJobId.toString();
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(job.conf, user);
if(LOG.isDebugEnabled()) {
LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
}
job.remoteJobSubmitDir =
FileSystem.get(job.conf).makeQualified(
new Path(path, oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
JobTokenIdentifier identifier =
new JobTokenIdentifier(new Text(oldJobIDString));
job.jobToken =
new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
job.jobToken.setService(identifier.getJobId());
// Add it to the jobTokenSecretManager so that TaskAttemptListener server
// can authenticate containers(tasks)
job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
LOG.info("Adding job token for " + oldJobIDString
+ " to jobTokenSecretManager");
// If the job client did not setup the shuffle secret then reuse
// the job token secret for the shuffle.
if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
LOG.warn("Shuffle secret key missing from job credentials."
+ " Using job token secret as shuffle secret.");
TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
job.jobCredentials);
}
}
{code}
5.
Due to the IOException from JobImpl#setup, the new job is still at state
JobStateInternal.NEW
{code}
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
{code}
At the following code of MRAppMaster#serviceStart, The MR AM detect the state
is not INITED and send a JOB_INIT_FAILED event.
{code}
// If job is still not initialized, an error happened during
// initialization. Must complete starting all of the services so failure
// events can be processed.
initFailed = (((JobImpl)job).getInternalState() !=
JobStateInternal.INITED);
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(),
JobEventType.JOB_INIT_FAILED);
jobEventDispatcher.handle(initFailedEvent);
} else {
// All components have started, start the job.
startJobs();
}
{code}
6.
After JobImpl receives the JOB_INIT_FAILED, it will call
InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
{code}
.addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
JobEventType.JOB_INIT_FAILED,
new InitFailedTransition())
{code}
7.
JobImpl will send CommitterJobAbortEvent in InitFailedTransition#transition
{code}
public void transition(JobImpl job, JobEvent event) {
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
}
{code}
8.
CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort
which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
{code}
protected void handleJobAbort(CommitterJobAbortEvent event) {
cancelJobCommit();
try {
committer.abortJob(event.getJobContext(), event.getFinalState());
} catch (Exception e) {
LOG.warn("Could not abort job", e);
}
context.getEventHandler().handle(new JobAbortCompletedEvent(
event.getJobID(), event.getFinalState()));
}
{code}
9.
After JobImpl receives the JOB_ABORT_COMPLETED, it will call
JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
{code}
.addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
JobEventType.JOB_ABORT_COMPLETED,
new JobAbortCompletedTransition())
{code}
10.
JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish
which will send JobUnsuccessfulCompletionEvent with finish time.
{code}
public void transition(JobImpl job, JobEvent event) {
JobStateInternal finalState = JobStateInternal.valueOf(
((JobAbortCompletedEvent) event).getFinalState().name());
job.unsuccessfulFinish(finalState);
}
private void unsuccessfulFinish(JobStateInternal finalState) {
if (finishTime == 0) setFinishTime();
cleanupProgress = 1.0f;
JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
new JobUnsuccessfulCompletionEvent(oldJobId,
finishTime,
succeededMapTaskCount,
succeededReduceTaskCount,
finalState.toString(),
diagnostics);
eventHandler.handle(new JobHistoryEvent(jobId,
unsuccessfulJobEvent));
finished(finalState);
}
{code}
11.
JobUnsuccessfulCompletionEvent will be handled by
JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
Based on the following code, you can see the JobIndexInfo#finishTime is set
correctly but JobIndexInfo#submitTime and JobIndexInfo#jobStartTime are still
-1.
{code}
if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
|| event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
try {
JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event
.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
{code}
The error job history file name in our log is
"job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
Based on the filename, you can see submitTime is -1, finishTime is
1423572836007 and jobStartTime is 1423572836007.
The jobStartTime is not -1, and jobStartTime is the same as finishTime.
It is because jobStartTime is handled specially in
FileNameIndexUtils#getDoneFileName:
{code}
//JobStartTime
if (indexInfo.getJobStartTime() >= 0) {
sb.append(indexInfo.getJobStartTime());
} else {
sb.append(indexInfo.getFinishTime());
}
{code}
was:
-1 job submit time cause IllegalArgumentException when parse the Job history
file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
We found the following job history file name which cause
IllegalArgumentException when parse the job status in the job history file name.
{code}
job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
{code}
when IOException happened in JobImpl#setup, the Job submit time in
JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job
submit time will be its [initial value
-1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
{code}
this.jobIndexInfo =
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
queueName);
{code}
The following is the sequences to get -1 job submit time:
1.
a job is created at MRAppMaster#serviceStart and the new job is at state
JobStateInternal.NEW after created
{code}
job = createJob(getConfig(), forcedState, shutDownMessage);
{code}
2.
JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
{code}
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
// Send init to the job (this does NOT trigger job execution)
// This is a synchronous call, not an event through dispatcher. We want
// job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent);
{code}
3.
after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
{code}
.addTransition
(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
JobEventType.JOB_INIT,
new InitTransition())
{code}
4.
then the exception happen from setup(job) in InitTransition#transition before
JobSubmittedEvent is handled.
JobSubmittedEvent will update the job submit time. Due to the exception, the
submit time is still the initial value -1.
This is the code InitTransition#transition
{code}
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf,
job.oldJobId);
}
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
job.makeUberDecision(inputLength);
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
{code}
This is the code JobImpl#setup
{code}
protected void setup(JobImpl job) throws IOException {
String oldJobIDString = job.oldJobId.toString();
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(job.conf, user);
if(LOG.isDebugEnabled()) {
LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
}
job.remoteJobSubmitDir =
FileSystem.get(job.conf).makeQualified(
new Path(path, oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
JobTokenIdentifier identifier =
new JobTokenIdentifier(new Text(oldJobIDString));
job.jobToken =
new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
job.jobToken.setService(identifier.getJobId());
// Add it to the jobTokenSecretManager so that TaskAttemptListener server
// can authenticate containers(tasks)
job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
LOG.info("Adding job token for " + oldJobIDString
+ " to jobTokenSecretManager");
// If the job client did not setup the shuffle secret then reuse
// the job token secret for the shuffle.
if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
LOG.warn("Shuffle secret key missing from job credentials."
+ " Using job token secret as shuffle secret.");
TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
job.jobCredentials);
}
}
{code}
5.
Due to the IOException from JobImpl#setup, the new job is still at state
JobStateInternal.NEW
{code}
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
{code}
At the following code of MRAppMaster#serviceStart, The MR AM detect the state
is not INITED and send a JOB_INIT_FAILED event.
{code}
// If job is still not initialized, an error happened during
// initialization. Must complete starting all of the services so failure
// events can be processed.
initFailed = (((JobImpl)job).getInternalState() !=
JobStateInternal.INITED);
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(),
JobEventType.JOB_INIT_FAILED);
jobEventDispatcher.handle(initFailedEvent);
} else {
// All components have started, start the job.
startJobs();
}
{code}
6.
After JobImpl receives the JOB_INIT_FAILED, it will call
InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
{code}
.addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
JobEventType.JOB_INIT_FAILED,
new InitFailedTransition())
{code}
7.
JobImpl will send CommitterJobAbortEvent in InitFailedTransition#transition
{code}
public void transition(JobImpl job, JobEvent event) {
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
}
{code}
8.
CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort
which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
{code}
protected void handleJobAbort(CommitterJobAbortEvent event) {
cancelJobCommit();
try {
committer.abortJob(event.getJobContext(), event.getFinalState());
} catch (Exception e) {
LOG.warn("Could not abort job", e);
}
context.getEventHandler().handle(new JobAbortCompletedEvent(
event.getJobID(), event.getFinalState()));
}
{code}
9.
After JobImpl receives the JOB_ABORT_COMPLETED, it will call
JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
{code}
.addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
JobEventType.JOB_ABORT_COMPLETED,
new JobAbortCompletedTransition())
{code}
10.
JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish
which will send JobUnsuccessfulCompletionEvent with finish time.
{code}
public void transition(JobImpl job, JobEvent event) {
JobStateInternal finalState = JobStateInternal.valueOf(
((JobAbortCompletedEvent) event).getFinalState().name());
job.unsuccessfulFinish(finalState);
}
private void unsuccessfulFinish(JobStateInternal finalState) {
if (finishTime == 0) setFinishTime();
cleanupProgress = 1.0f;
JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
new JobUnsuccessfulCompletionEvent(oldJobId,
finishTime,
succeededMapTaskCount,
succeededReduceTaskCount,
finalState.toString(),
diagnostics);
eventHandler.handle(new JobHistoryEvent(jobId,
unsuccessfulJobEvent));
finished(finalState);
}
{code}
11.
JobUnsuccessfulCompletionEvent will be handled by
JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
Based on the following code, you can see the JobIndexInfo#finishTime is set
correctly but JobIndexInfo#submitTime and JobIndexInfo#jobStartTime are still
-1.
{code}
if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
|| event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
try {
JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event
.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
{code}
The error job history file name in our log is
"job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
Based on the filename, you can see submitTime is -1, finishTime is
1423572836007 and jobStartTime is 1423572836007.
The jobStartTime is not -1, and jobStartTime is the same as finishTime.
It is because jobStartTime is handled specially in
FileNameIndexUtils#getDoneFileName:
{code}
//JobStartTime
if (indexInfo.getJobStartTime() >= 0) {
sb.append(indexInfo.getJobStartTime());
} else {
sb.append(indexInfo.getFinishTime());
}
{code}
> -1 job submit time cause IllegalArgumentException when parse the Job history
> file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: MAPREDUCE-6259
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6259
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: jobhistoryserver
> Reporter: zhihai xu
> Assignee: zhihai xu
>
> -1 job submit time cause IllegalArgumentException when parse the Job history
> file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
> We found the following job history file name which cause
> IllegalArgumentException when parse the job status in the job history file
> name.
> {code}
> job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
> {code}
> The stack trace for the IllegalArgumentException is
> {code}
> 2015-02-10 04:54:01,863 WARN org.apache.hadoop.mapreduce.v2.hs.PartialJob:
> Exception while parsing job state. Defaulting to KILLED
> java.lang.IllegalArgumentException: No enum constant
> org.apache.hadoop.mapreduce.v2.api.records.JobState.0
> at java.lang.Enum.valueOf(Enum.java:236)
> at
> org.apache.hadoop.mapreduce.v2.api.records.JobState.valueOf(JobState.java:21)
> at
> org.apache.hadoop.mapreduce.v2.hs.PartialJob.getState(PartialJob.java:82)
> at
> org.apache.hadoop.mapreduce.v2.hs.PartialJob.<init>(PartialJob.java:59)
> at
> org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getAllPartialJobs(CachedHistoryStorage.java:159)
> at
> org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getPartialJobs(CachedHistoryStorage.java:173)
> at
> org.apache.hadoop.mapreduce.v2.hs.JobHistory.getPartialJobs(JobHistory.java:284)
> at
> org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebServices.getJobs(HsWebServices.java:212)
> at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
> at
> com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185)
> at
> com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
> at
> com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:288)
> at
> com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
> at
> com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
> at
> com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
> at
> com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
> at
> com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1469)
> at
> com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1400)
> at
> com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349)
> at
> com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339)
> at
> com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416)
> at
> com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537)
> at
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:886)
> at
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
> at
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
> at
> com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
> at
> com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
> at
> com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
> at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
> at
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at
> org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
> at
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at
> org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
> at
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
> at
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
> at
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at
> org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
> at
> org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
> at
> org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
> at
> org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
> at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
> at
> org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
> at
> org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
> at org.mortbay.jetty.Server.handle(Server.java:326)
> at
> org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
> at
> org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
> at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
> at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
> at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
> at
> org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
> at
> org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
> {code}
> when IOException happened in JobImpl#setup, the Job submit time in
> JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job
> submit time will be its [initial value
> -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
> {code}
> this.jobIndexInfo =
> new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
> queueName);
> {code}
> The following is the sequences to get -1 job submit time:
> 1.
> a job is created at MRAppMaster#serviceStart and the new job is at state
> JobStateInternal.NEW after created
> {code}
> job = createJob(getConfig(), forcedState, shutDownMessage);
> {code}
> 2.
> JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
> {code}
> JobEvent initJobEvent = new JobEvent(job.getID(),
> JobEventType.JOB_INIT);
> // Send init to the job (this does NOT trigger job execution)
> // This is a synchronous call, not an event through dispatcher. We want
> // job-init to be done completely here.
> jobEventDispatcher.handle(initJobEvent);
> {code}
> 3.
> after JobImpl received JobEventType.JOB_INIT, it call
> InitTransition#transition
> {code}
> .addTransition
> (JobStateInternal.NEW,
> EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
> JobEventType.JOB_INIT,
> new InitTransition())
> {code}
> 4.
> then the exception happen from setup(job) in InitTransition#transition before
> JobSubmittedEvent is handled.
> JobSubmittedEvent will update the job submit time. Due to the exception, the
> submit time is still the initial value -1.
> This is the code InitTransition#transition
> {code}
> public JobStateInternal transition(JobImpl job, JobEvent event) {
> job.metrics.submittedJob(job);
> job.metrics.preparingJob(job);
> if (job.newApiCommitter) {
> job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
> } else {
> job.jobContext = new
> org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
> }
> try {
> setup(job);
> job.fs = job.getFileSystem(job.conf);
> //log to job history
> JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
> job.conf.get(MRJobConfig.JOB_NAME, "test"),
> job.conf.get(MRJobConfig.USER_NAME, "mapred"),
> job.appSubmitTime,
> job.remoteJobConfFile.toString(),
> job.jobACLs, job.queueName,
> job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
> job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
> job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
> getWorkflowAdjacencies(job.conf),
> job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
> job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
> //TODO JH Verify jobACLs, UserName via UGI?
> TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
> job.numMapTasks = taskSplitMetaInfo.length;
> job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
> if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
> job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
> } else if (job.numMapTasks == 0) {
> job.reduceWeight = 0.9f;
> } else if (job.numReduceTasks == 0) {
> job.mapWeight = 0.9f;
> } else {
> job.mapWeight = job.reduceWeight = 0.45f;
> }
> checkTaskLimits();
> long inputLength = 0;
> for (int i = 0; i < job.numMapTasks; ++i) {
> inputLength += taskSplitMetaInfo[i].getInputDataLength();
> }
> job.makeUberDecision(inputLength);
>
> job.taskAttemptCompletionEvents =
> new ArrayList<TaskAttemptCompletionEvent>(
> job.numMapTasks + job.numReduceTasks + 10);
> job.mapAttemptCompletionEvents =
> new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
> job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
> job.numMapTasks + job.numReduceTasks + 10);
> job.allowedMapFailuresPercent =
> job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
> job.allowedReduceFailuresPercent =
> job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
> // create the Tasks but don't start them yet
> createMapTasks(job, inputLength, taskSplitMetaInfo);
> createReduceTasks(job);
> job.metrics.endPreparingJob(job);
> return JobStateInternal.INITED;
> } catch (Exception e) {
> LOG.warn("Job init failed", e);
> job.metrics.endPreparingJob(job);
> job.addDiagnostic("Job init failed : "
> + StringUtils.stringifyException(e));
> // Leave job in the NEW state. The MR AM will detect that the state is
> // not INITED and send a JOB_INIT_FAILED event.
> return JobStateInternal.NEW;
> }
> }
> {code}
> This is the code JobImpl#setup
> {code}
> protected void setup(JobImpl job) throws IOException {
> String oldJobIDString = job.oldJobId.toString();
> String user =
> UserGroupInformation.getCurrentUser().getShortUserName();
> Path path = MRApps.getStagingAreaDir(job.conf, user);
> if(LOG.isDebugEnabled()) {
> LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
> }
> job.remoteJobSubmitDir =
> FileSystem.get(job.conf).makeQualified(
> new Path(path, oldJobIDString));
> job.remoteJobConfFile =
> new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
> // Prepare the TaskAttemptListener server for authentication of
> Containers
> // TaskAttemptListener gets the information via jobTokenSecretManager.
> JobTokenIdentifier identifier =
> new JobTokenIdentifier(new Text(oldJobIDString));
> job.jobToken =
> new Token<JobTokenIdentifier>(identifier,
> job.jobTokenSecretManager);
> job.jobToken.setService(identifier.getJobId());
> // Add it to the jobTokenSecretManager so that TaskAttemptListener
> server
> // can authenticate containers(tasks)
> job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
> LOG.info("Adding job token for " + oldJobIDString
> + " to jobTokenSecretManager");
> // If the job client did not setup the shuffle secret then reuse
> // the job token secret for the shuffle.
> if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
> LOG.warn("Shuffle secret key missing from job credentials."
> + " Using job token secret as shuffle secret.");
> TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
> job.jobCredentials);
> }
> }
> {code}
> 5.
> Due to the IOException from JobImpl#setup, the new job is still at state
> JobStateInternal.NEW
> {code}
> } catch (Exception e) {
> LOG.warn("Job init failed", e);
> job.metrics.endPreparingJob(job);
> job.addDiagnostic("Job init failed : "
> + StringUtils.stringifyException(e));
> // Leave job in the NEW state. The MR AM will detect that the state is
> // not INITED and send a JOB_INIT_FAILED event.
> return JobStateInternal.NEW;
> }
> {code}
> At the following code of MRAppMaster#serviceStart, The MR AM detect the state
> is not INITED and send a JOB_INIT_FAILED event.
> {code}
> // If job is still not initialized, an error happened during
> // initialization. Must complete starting all of the services so failure
> // events can be processed.
> initFailed = (((JobImpl)job).getInternalState() !=
> JobStateInternal.INITED);
> if (initFailed) {
> JobEvent initFailedEvent = new JobEvent(job.getID(),
> JobEventType.JOB_INIT_FAILED);
> jobEventDispatcher.handle(initFailedEvent);
> } else {
> // All components have started, start the job.
> startJobs();
> }
> {code}
> 6.
> After JobImpl receives the JOB_INIT_FAILED, it will call
> InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
> {code}
> .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
> JobEventType.JOB_INIT_FAILED,
> new InitFailedTransition())
> {code}
> 7.
> JobImpl will send CommitterJobAbortEvent in InitFailedTransition#transition
> {code}
> public void transition(JobImpl job, JobEvent event) {
> job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
> job.jobContext,
> org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
> }
> {code}
> 8.
> CommitterJobAbortEvent will be handled by
> CommitterEventHandler#handleJobAbort which will send
> JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
> {code}
> protected void handleJobAbort(CommitterJobAbortEvent event) {
> cancelJobCommit();
> try {
> committer.abortJob(event.getJobContext(), event.getFinalState());
> } catch (Exception e) {
> LOG.warn("Could not abort job", e);
> }
> context.getEventHandler().handle(new JobAbortCompletedEvent(
> event.getJobID(), event.getFinalState()));
> }
> {code}
> 9.
> After JobImpl receives the JOB_ABORT_COMPLETED, it will call
> JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
> {code}
> .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
> JobEventType.JOB_ABORT_COMPLETED,
> new JobAbortCompletedTransition())
> {code}
> 10.
> JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish
> which will send JobUnsuccessfulCompletionEvent with finish time.
> {code}
> public void transition(JobImpl job, JobEvent event) {
> JobStateInternal finalState = JobStateInternal.valueOf(
> ((JobAbortCompletedEvent) event).getFinalState().name());
> job.unsuccessfulFinish(finalState);
> }
> private void unsuccessfulFinish(JobStateInternal finalState) {
> if (finishTime == 0) setFinishTime();
> cleanupProgress = 1.0f;
> JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
> new JobUnsuccessfulCompletionEvent(oldJobId,
> finishTime,
> succeededMapTaskCount,
> succeededReduceTaskCount,
> finalState.toString(),
> diagnostics);
> eventHandler.handle(new JobHistoryEvent(jobId,
> unsuccessfulJobEvent));
> finished(finalState);
> }
> {code}
> 11.
> JobUnsuccessfulCompletionEvent will be handled by
> JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
> Based on the following code, you can see the JobIndexInfo#finishTime is set
> correctly but JobIndexInfo#submitTime and JobIndexInfo#jobStartTime are
> still -1.
> {code}
> if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
> || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
> try {
> JobUnsuccessfulCompletionEvent jucEvent =
> (JobUnsuccessfulCompletionEvent) event
> .getHistoryEvent();
> mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
> mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
> mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
> mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
> closeEventWriter(event.getJobID());
> processDoneFiles(event.getJobID());
> } catch (IOException e) {
> throw new YarnRuntimeException(e);
> }
> }
> {code}
> The error job history file name in our log is
> "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
> Based on the filename, you can see submitTime is -1, finishTime is
> 1423572836007 and jobStartTime is 1423572836007.
> The jobStartTime is not -1, and jobStartTime is the same as finishTime.
> It is because jobStartTime is handled specially in
> FileNameIndexUtils#getDoneFileName:
> {code}
> //JobStartTime
> if (indexInfo.getJobStartTime() >= 0) {
> sb.append(indexInfo.getJobStartTime());
> } else {
> sb.append(indexInfo.getFinishTime());
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)