Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java Fri Apr 12 23:05:28 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.api.impl.pb.client; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -101,7 +102,8 @@ import org.apache.hadoop.yarn.exceptions import com.google.protobuf.ServiceException; -public class MRClientProtocolPBClientImpl implements MRClientProtocol { +public class MRClientProtocolPBClientImpl implements MRClientProtocol, + Closeable { protected MRClientProtocolPB proxy; @@ -118,6 +120,13 @@ public class MRClientProtocolPBClientImp } @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException { GetJobReportRequestProto requestProto = ((GetJobReportRequestPBImpl)request).getProto();
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Fri Apr 12 23:05:28 2013 @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import com.google.common.base.Joiner; @@ -525,4 +526,19 @@ public class JobHistoryUtils { sb.append(jobId.toString()); return sb.toString(); } + + public static Path getPreviousJobHistoryPath( + Configuration conf, ApplicationAttemptId applicationAttemptId) + throws IOException { + String jobId = + TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) + .toString(); + String jobhistoryDir = + JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); + Path histDirPath = FileContext.getFileContext(conf).makeQualified( + new Path(jobhistoryDir)); + FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); + return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( + histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1))); + } } Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Fri Apr 12 23:05:28 2013 @@ -95,7 +95,8 @@ {"name": "workflowId", "type": "string"}, {"name": "workflowName", "type": "string"}, {"name": "workflowNodeName", "type": "string"}, - {"name": "workflowAdjacencies", "type": "string"} + {"name": "workflowAdjacencies", "type": "string"}, + {"name": "workflowTags", "type": "string"} ] }, Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Fri Apr 12 23:05:28 2013 @@ -138,15 +138,6 @@ import org.apache.hadoop.util.ToolRunner public class JobClient extends CLI { public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; - /* notes that get delegation token was called. Again this is hack for oozie - * to make sure we add history server delegation tokens to the credentials - * for the job. Since the api only allows one delegation token to be returned, - * we have to add this hack. - */ - private boolean getDelegationTokenCalled = false; - /* do we need a HS delegation token for this client */ - static final String HS_DELEGATION_TOKEN_REQUIRED - = "mapreduce.history.server.delegationtoken.required"; static{ ConfigUtil.loadResources(); @@ -569,10 +560,6 @@ public class JobClient extends CLI { try { conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false); - if (getDelegationTokenCalled) { - conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); - getDelegationTokenCalled = false; - } Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { @Override public Job run() throws IOException, ClassNotFoundException, @@ -1173,7 +1160,6 @@ public class JobClient extends CLI { */ public Token<DelegationTokenIdentifier> getDelegationToken(final Text renewer) throws IOException, InterruptedException { - getDelegationTokenCalled = true; return clientUgi.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { public Token<DelegationTokenIdentifier> run() throws IOException, Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Apr 12 23:05:28 2013 @@ -103,4 +103,15 @@ public interface MRConfig { "mapreduce.ifile.readahead.bytes"; public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES = - 4 * 1024 * 1024;} + 4 * 1024 * 1024; + + /** + * Whether users are explicitly trying to control resource monitoring + * configuration for the MiniMRCluster. Disabled by default. + */ + public static final String MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING + = "mapreduce.minicluster.control-resource-monitoring"; + public static final boolean + DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING = false; +} + Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Apr 12 23:05:28 2013 @@ -422,6 +422,7 @@ public interface MRJobConfig { /** Enable job recovery.*/ public static final String MR_AM_JOB_RECOVERY_ENABLE = MR_AM_PREFIX + "job.recovery.enable"; + public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true; /** * Limit on the number of reducers that can be preempted to ensure that at @@ -664,6 +665,8 @@ public interface MRJobConfig { public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = "^mapreduce\\.workflow\\.adjacency\\..+"; + public static final String WORKFLOW_TAGS = "mapreduce.workflow.tags"; + /** * The maximum number of application attempts. * It is a application-specific setting. Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Fri Apr 12 23:05:28 2013 @@ -75,6 +75,31 @@ public class JobSubmittedEvent implement Map<JobACL, AccessControlList> jobACLs, String jobQueueName, String workflowId, String workflowName, String workflowNodeName, String workflowAdjacencies) { + this(id, jobName, userName, submitTime, jobConfPath, jobACLs, + jobQueueName, workflowId, workflowName, workflowNodeName, + workflowAdjacencies, ""); + } + + /** + * Create an event to record job submission + * @param id The job Id of the job + * @param jobName Name of the job + * @param userName Name of the user who submitted the job + * @param submitTime Time of submission + * @param jobConfPath Path of the Job Configuration file + * @param jobACLs The configured acls for the job. + * @param jobQueueName The job-queue to which this job was submitted to + * @param workflowId The Id of the workflow + * @param workflowName The name of the workflow + * @param workflowNodeName The node name of the workflow + * @param workflowAdjacencies The adjacencies of the workflow + * @param workflowTags Comma-separated tags for the workflow + */ + public JobSubmittedEvent(JobID id, String jobName, String userName, + long submitTime, String jobConfPath, + Map<JobACL, AccessControlList> jobACLs, String jobQueueName, + String workflowId, String workflowName, String workflowNodeName, + String workflowAdjacencies, String workflowTags) { datum.jobid = new Utf8(id.toString()); datum.jobName = new Utf8(jobName); datum.userName = new Utf8(userName); @@ -101,6 +126,9 @@ public class JobSubmittedEvent implement if (workflowAdjacencies != null) { datum.workflowAdjacencies = new Utf8(workflowAdjacencies); } + if (workflowTags != null) { + datum.workflowTags = new Utf8(workflowTags); + } } JobSubmittedEvent() {} @@ -168,6 +196,13 @@ public class JobSubmittedEvent implement } return null; } + /** Get the workflow tags */ + public String getWorkflowTags() { + if (datum.workflowTags != null) { + return datum.workflowTags.toString(); + } + return null; + } /** Get the event type */ public EventType getEventType() { return EventType.JOB_SUBMITTED; } Propchange: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1462626-1467500 Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Apr 12 23:05:28 2013 @@ -869,4 +869,9 @@ public class HistoryFileManager extends } } } + // for test + @VisibleForTesting + void setMaxHistoryAge(long newValue){ + maxHistoryAge=newValue; + } } Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/JobInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/JobInfo.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/JobInfo.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/JobInfo.java Fri Apr 12 23:05:28 2013 @@ -287,7 +287,7 @@ public class JobInfo { avgShuffleTime += (attempt.getShuffleFinishTime() - attempt .getLaunchTime()); avgMergeTime += attempt.getSortFinishTime() - - attempt.getLaunchTime(); + - attempt.getShuffleFinishTime(); avgReduceTime += (attempt.getFinishTime() - attempt .getShuffleFinishTime()); } Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java Fri Apr 12 23:05:28 2013 @@ -21,46 +21,75 @@ package org.apache.hadoop.mapreduce.v2.h import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.hs.CompletedTask; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; public class TestCompletedTask{ - @Test + @Test (timeout=5000) public void testTaskStartTimes() { - TaskId taskId = Mockito.mock(TaskId.class); - TaskInfo taskInfo = Mockito.mock(TaskInfo.class); + TaskId taskId = mock(TaskId.class); + TaskInfo taskInfo = mock(TaskInfo.class); Map<TaskAttemptID, TaskAttemptInfo> taskAttempts = new TreeMap<TaskAttemptID, TaskAttemptInfo>(); TaskAttemptID id = new TaskAttemptID("0", 0, TaskType.MAP, 0, 0); - TaskAttemptInfo info = Mockito.mock(TaskAttemptInfo.class); - Mockito.when(info.getAttemptId()).thenReturn(id); - Mockito.when(info.getStartTime()).thenReturn(10l); + TaskAttemptInfo info = mock(TaskAttemptInfo.class); + when(info.getAttemptId()).thenReturn(id); + when(info.getStartTime()).thenReturn(10l); taskAttempts.put(id, info); id = new TaskAttemptID("1", 0, TaskType.MAP, 1, 1); - info = Mockito.mock(TaskAttemptInfo.class); - Mockito.when(info.getAttemptId()).thenReturn(id); - Mockito.when(info.getStartTime()).thenReturn(20l); + info = mock(TaskAttemptInfo.class); + when(info.getAttemptId()).thenReturn(id); + when(info.getStartTime()).thenReturn(20l); taskAttempts.put(id, info); - Mockito.when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts); + when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts); CompletedTask task = new CompletedTask(taskId, taskInfo); TaskReport report = task.getReport(); // Make sure the startTime returned by report is the lesser of the // attempy launch times - Assert.assertTrue(report.getStartTime() == 10); + assertTrue(report.getStartTime() == 10); + } + /** + * test some methods of CompletedTaskAttempt + */ + @Test (timeout=5000) + public void testCompletedTaskAttempt(){ + + TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class); + when(attemptInfo.getRackname()).thenReturn("Rackname"); + when(attemptInfo.getShuffleFinishTime()).thenReturn(11L); + when(attemptInfo.getSortFinishTime()).thenReturn(12L); + when(attemptInfo.getShufflePort()).thenReturn(10); + + JobID jobId= new JobID("12345",0); + TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0); + TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0); + when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId); + + + CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo); + assertEquals( "Rackname", taskAttemt.getNodeRackName()); + assertEquals( Phase.CLEANUP, taskAttemt.getPhase()); + assertTrue( taskAttemt.isFinished()); + assertEquals( 11L, taskAttemt.getShuffleFinishTime()); + assertEquals( 12L, taskAttemt.getSortFinishTime()); + assertEquals( 10, taskAttemt.getShufflePort()); } } Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Fri Apr 12 23:05:28 2013 @@ -45,7 +45,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.apache.hadoop.mapred.TaskCompletionEvent; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; @RunWith(value = Parameterized.class) @@ -79,7 +81,7 @@ public class TestJobHistoryEntities { } /* Verify some expected values based on the history file */ - @Test (timeout=10000) + @Test (timeout=100000) public void testCompletedJob() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); @@ -92,11 +94,11 @@ public class TestJobHistoryEntities { assertEquals(1, completedJob.getAMInfos().size()); assertEquals(10, completedJob.getCompletedMaps()); assertEquals(1, completedJob.getCompletedReduces()); - assertEquals(11, completedJob.getTasks().size()); + assertEquals(12, completedJob.getTasks().size()); //Verify tasks loaded at this point. assertEquals(true, completedJob.tasksLoaded.get()); assertEquals(10, completedJob.getTasks(TaskType.MAP).size()); - assertEquals(1, completedJob.getTasks(TaskType.REDUCE).size()); + assertEquals(2, completedJob.getTasks(TaskType.REDUCE).size()); assertEquals("user", completedJob.getUserName()); assertEquals(JobState.SUCCEEDED, completedJob.getState()); JobReport jobReport = completedJob.getReport(); @@ -117,7 +119,7 @@ public class TestJobHistoryEntities { Map<TaskId, Task> mapTasks = completedJob.getTasks(TaskType.MAP); Map<TaskId, Task> reduceTasks = completedJob.getTasks(TaskType.REDUCE); assertEquals(10, mapTasks.size()); - assertEquals(1, reduceTasks.size()); + assertEquals(2, reduceTasks.size()); Task mt1 = mapTasks.get(mt1Id); assertEquals(1, mt1.getAttempts().size()); @@ -132,7 +134,7 @@ public class TestJobHistoryEntities { assertEquals(TaskState.SUCCEEDED, rt1Report.getTaskState()); assertEquals(rt1Id, rt1Report.getTaskId()); } - + @Test (timeout=10000) public void testCompletedTaskAttempt() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); @@ -168,4 +170,45 @@ public class TestJobHistoryEntities { assertEquals(45454, rta1Report.getNodeManagerPort()); assertEquals(9999, rta1Report.getNodeManagerHttpPort()); } + /** + * Simple test of some methods of CompletedJob + * @throws Exception + */ + @Test (timeout=30000) + public void testGetTaskAttemptCompletionEvent() throws Exception{ + HistoryFileInfo info = mock(HistoryFileInfo.class); + when(info.getConfFile()).thenReturn(fullConfPath); + completedJob = + new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", + info, jobAclsManager); + TaskCompletionEvent[] events= completedJob.getMapAttemptCompletionEvents(0,1000); + assertEquals(10, completedJob.getMapAttemptCompletionEvents(0,10).length); + int currentEventId=0; + for (TaskCompletionEvent taskAttemptCompletionEvent : events) { + int eventId= taskAttemptCompletionEvent.getEventId(); + assertTrue(eventId>=currentEventId); + currentEventId=eventId; + } + assertNull(completedJob.loadConfFile() ); + // job name + assertEquals("Sleep job",completedJob.getName()); + // queue name + assertEquals("default",completedJob.getQueueName()); + // progress + assertEquals(1.0, completedJob.getProgress(),0.001); + // 12 rows in answer + assertEquals(12,completedJob.getTaskAttemptCompletionEvents(0,1000).length); + // select first 10 rows + assertEquals(10,completedJob.getTaskAttemptCompletionEvents(0,10).length); + // select 5-10 rows include 5th + assertEquals(7,completedJob.getTaskAttemptCompletionEvents(5,10).length); + + // without errors + assertEquals(1,completedJob.getDiagnostics().size()); + assertEquals("",completedJob.getDiagnostics().get(0)); + + assertEquals(0, completedJob.getJobACLs().size()); + + } + } Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1467511&r1=1467510&r2=1467511&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Fri Apr 12 23:05:28 2013 @@ -19,6 +19,9 @@ package org.apache.hadoop.mapreduce.v2.hs; import java.io.ByteArrayOutputStream; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.io.PrintStream; import java.util.Arrays; @@ -54,6 +57,9 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl; +import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -65,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.hs import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -80,12 +88,12 @@ public class TestJobHistoryParsing { private static final String RACK_NAME = "/MyRackName"; - private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); public static class MyResolver implements DNSToSwitchMapping { @Override public List<String> resolve(List<String> names) { - return Arrays.asList(new String[]{RACK_NAME}); + return Arrays.asList(new String[] { RACK_NAME }); } @Override @@ -93,14 +101,14 @@ public class TestJobHistoryParsing { } } - @Test (timeout=50000) + @Test(timeout = 50000) public void testJobInfo() throws Exception { JobInfo info = new JobInfo(); Assert.assertEquals("NORMAL", info.getPriority()); info.printAll(); } - @Test (timeout=50000) + @Test(timeout = 300000) public void testHistoryParsing() throws Exception { LOG.info("STARTING testHistoryParsing()"); try { @@ -109,8 +117,8 @@ public class TestJobHistoryParsing { LOG.info("FINISHED testHistoryParsing()"); } } - - @Test (timeout=50000) + + @Test(timeout = 50000) public void testHistoryParsingWithParseErrors() throws Exception { LOG.info("STARTING testHistoryParsingWithParseErrors()"); try { @@ -119,18 +127,18 @@ public class TestJobHistoryParsing { LOG.info("FINISHED testHistoryParsingWithParseErrors()"); } } - - private static String getJobSummary(FileContext fc, Path path) throws IOException { + + private static String getJobSummary(FileContext fc, Path path) + throws IOException { Path qPath = fc.makeQualified(path); FSDataInputStream in = fc.open(qPath); String jobSummaryString = in.readUTF(); in.close(); return jobSummaryString; } - + private void checkHistoryParsing(final int numMaps, final int numReduces, - final int numSuccessfulMaps) - throws Exception { + final int numSuccessfulMaps) throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); long amStartTimeEst = System.currentTimeMillis(); @@ -138,9 +146,8 @@ public class TestJobHistoryParsing { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); - MRApp app = - new MRAppWithHistory(numMaps, numReduces, true, - this.getClass().getName(), true); + MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass() + .getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); @@ -152,7 +159,7 @@ public class TestJobHistoryParsing { String jobhistoryDir = JobHistoryUtils .getHistoryIntermediateDoneDirForUser(conf); - + FileContext fc = null; try { fc = FileContext.getFileContext(conf); @@ -160,7 +167,7 @@ public class TestJobHistoryParsing { LOG.info("Can not get FileContext", ioe); throw (new Exception("Can not get File Context")); } - + if (numMaps == numSuccessfulMaps) { String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobId); @@ -185,20 +192,22 @@ public class TestJobHistoryParsing { Long.parseLong(jobSummaryElements.get("submitTime")) != 0); Assert.assertTrue("launchTime should not be 0", Long.parseLong(jobSummaryElements.get("launchTime")) != 0); - Assert.assertTrue("firstMapTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); Assert - .assertTrue( - "firstReduceTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); + .assertTrue( + "firstMapTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); + Assert + .assertTrue("firstReduceTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements + .get("firstReduceTaskLaunchTime")) != 0); Assert.assertTrue("finishTime should not be 0", Long.parseLong(jobSummaryElements.get("finishTime")) != 0); Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, Integer.parseInt(jobSummaryElements.get("numMaps"))); Assert.assertEquals("Mismatch in num reduce slots", numReduces, Integer.parseInt(jobSummaryElements.get("numReduces"))); - Assert.assertEquals("User does not match", System.getProperty("user.name"), - jobSummaryElements.get("user")); + Assert.assertEquals("User does not match", + System.getProperty("user.name"), jobSummaryElements.get("user")); Assert.assertEquals("Queue does not match", "default", jobSummaryElements.get("queue")); Assert.assertEquals("Status does not match", "SUCCEEDED", @@ -210,8 +219,8 @@ public class TestJobHistoryParsing { HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); JobInfo jobInfo; long numFinishedMaps; - - synchronized(fileInfo) { + + synchronized (fileInfo) { Path historyFilePath = fileInfo.getHistoryFile(); FSDataInputStream in = null; LOG.info("JobHistoryFile is: " + historyFilePath); @@ -228,11 +237,11 @@ public class TestJobHistoryParsing { if (numMaps == numSuccessfulMaps) { reader = realReader; } else { - final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! + final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! Mockito.when(reader.getNextEvent()).thenAnswer( new Answer<HistoryEvent>() { - public HistoryEvent answer(InvocationOnMock invocation) - throws IOException { + public HistoryEvent answer(InvocationOnMock invocation) + throws IOException { HistoryEvent event = realReader.getNextEvent(); if (event instanceof TaskFinishedEvent) { numFinishedEvents.incrementAndGet(); @@ -244,22 +253,20 @@ public class TestJobHistoryParsing { throw new IOException("test"); } } - } - ); + }); } jobInfo = parser.parse(reader); - numFinishedMaps = - computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); + numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); if (numFinishedMaps != numMaps) { Exception parseException = parser.getParseException(); - Assert.assertNotNull("Didn't get expected parse exception", + Assert.assertNotNull("Didn't get expected parse exception", parseException); } } - + Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), jobInfo.getUsername()); Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); @@ -267,7 +274,7 @@ public class TestJobHistoryParsing { jobInfo.getJobQueueName()); Assert .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); - Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, + Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, numFinishedMaps); Assert.assertEquals("incorrect finishedReduces ", numReduces, jobInfo.getFinishedReduces()); @@ -275,8 +282,8 @@ public class TestJobHistoryParsing { jobInfo.getUberized()); Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); int totalTasks = allTasks.size(); - Assert.assertEquals("total number of tasks is incorrect ", - (numMaps+numReduces), totalTasks); + Assert.assertEquals("total number of tasks is incorrect ", + (numMaps + numReduces), totalTasks); // Verify aminfo Assert.assertEquals(1, jobInfo.getAMInfos().size()); @@ -306,8 +313,7 @@ public class TestJobHistoryParsing { // Deep compare Job and JobInfo for (Task task : job.getTasks().values()) { - TaskInfo taskInfo = allTasks.get( - TypeConverter.fromYarn(task.getID())); + TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); Assert.assertNotNull("TaskInfo not found", taskInfo); for (TaskAttempt taskAttempt : task.getAttempts().values()) { TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( @@ -318,27 +324,32 @@ public class TestJobHistoryParsing { if (numMaps == numSuccessfulMaps) { Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); - + // Verify rack-name - Assert.assertEquals("rack-name is incorrect", taskAttemptInfo - .getRackname(), RACK_NAME); + Assert.assertEquals("rack-name is incorrect", + taskAttemptInfo.getRackname(), RACK_NAME); } } } - + // test output for HistoryViewer - PrintStream stdps=System.out; + PrintStream stdps = System.out; try { System.setOut(new PrintStream(outContent)); HistoryViewer viewer = new HistoryViewer(fc.makeQualified( fileInfo.getHistoryFile()).toString(), conf, true); viewer.print(); - - for (TaskInfo taskInfo : allTasks.values()) { - - String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID(); - Assert.assertTrue(outContent.toString().indexOf(test)>0); - Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0); + + for (TaskInfo taskInfo : allTasks.values()) { + + String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo + .getTaskStatus()) + + " " + + taskInfo.getTaskType() + + " task list for " + taskInfo.getTaskId().getJobID(); + Assert.assertTrue(outContent.toString().indexOf(test) > 0); + Assert.assertTrue(outContent.toString().indexOf( + taskInfo.getTaskId().toString()) > 0); } } finally { System.setOut(stdps); @@ -363,186 +374,180 @@ public class TestJobHistoryParsing { } return numFinishedMaps; } - - @Test (timeout=50000) + + @Test(timeout = 30000) public void testHistoryParsingForFailedAttempts() throws Exception { LOG.info("STARTING testHistoryParsingForFailedAttempts"); try { - Configuration conf = new Configuration(); - conf - .setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - MyResolver.class, DNSToSwitchMapping.class); - RackResolver.init(conf); - MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(), - true); - app.submit(conf); - Job job = app.getContext().getAllJobs().values().iterator().next(); - JobId jobId = job.getID(); - app.waitForState(job, JobState.SUCCEEDED); - - // make sure all events are flushed - app.waitForState(Service.STATE.STOPPED); - - String jobhistoryDir = JobHistoryUtils - .getHistoryIntermediateDoneDirForUser(conf); - JobHistory jobHistory = new JobHistory(); - jobHistory.init(conf); + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this + .getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + app.waitForState(job, JobState.SUCCEEDED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + String jobhistoryDir = JobHistoryUtils + .getHistoryIntermediateDoneDirForUser(conf); + JobHistory jobHistory = new JobHistory(); + jobHistory.init(conf); + + JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) + .getJobIndexInfo(); + String jobhistoryFileName = FileNameIndexUtils + .getDoneFileName(jobIndexInfo); - JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) - .getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils - .getDoneFileName(jobIndexInfo); - - Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); - FSDataInputStream in = null; - FileContext fc = null; - try { - fc = FileContext.getFileContext(conf); - in = fc.open(fc.makeQualified(historyFilePath)); - } catch (IOException ioe) { - LOG.info("Can not open history file: " + historyFilePath, ioe); - throw (new Exception("Can not open History File")); - } + Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } - JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); - Exception parseException = parser.getParseException(); - Assert.assertNull("Caught an expected exception " + parseException, - parseException); - int noOffailedAttempts = 0; - Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); - for (Task task : job.getTasks().values()) { - TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); - for (TaskAttempt taskAttempt : task.getAttempts().values()) { - TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( - TypeConverter.fromYarn((taskAttempt.getID()))); - // Verify rack-name for all task attempts - Assert.assertEquals("rack-name is incorrect", taskAttemptInfo - .getRackname(), RACK_NAME); - if (taskAttemptInfo.getTaskStatus().equals("FAILED")) { - noOffailedAttempts++; + JobHistoryParser parser = new JobHistoryParser(in); + JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + Assert.assertNull("Caught an expected exception " + parseException, + parseException); + int noOffailedAttempts = 0; + Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); + for (Task task : job.getTasks().values()) { + TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); + for (TaskAttempt taskAttempt : task.getAttempts().values()) { + TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( + TypeConverter.fromYarn((taskAttempt.getID()))); + // Verify rack-name for all task attempts + Assert.assertEquals("rack-name is incorrect", + taskAttemptInfo.getRackname(), RACK_NAME); + if (taskAttemptInfo.getTaskStatus().equals("FAILED")) { + noOffailedAttempts++; + } } } - } - Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts); + Assert.assertEquals("No of Failed tasks doesn't match.", 2, + noOffailedAttempts); } finally { LOG.info("FINISHED testHistoryParsingForFailedAttempts"); } } - - @Test (timeout=5000) + + @Test(timeout = 60000) public void testCountersForFailedTask() throws Exception { LOG.info("STARTING testCountersForFailedTask"); try { - Configuration conf = new Configuration(); - conf - .setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - MyResolver.class, DNSToSwitchMapping.class); - RackResolver.init(conf); - MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, - this.getClass().getName(), true); - app.submit(conf); - Job job = app.getContext().getAllJobs().values().iterator().next(); - JobId jobId = job.getID(); - app.waitForState(job, JobState.FAILED); - - // make sure all events are flushed - app.waitForState(Service.STATE.STOPPED); + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this + .getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + app.waitForState(job, JobState.FAILED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + String jobhistoryDir = JobHistoryUtils + .getHistoryIntermediateDoneDirForUser(conf); + JobHistory jobHistory = new JobHistory(); + jobHistory.init(conf); + + JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) + .getJobIndexInfo(); + String jobhistoryFileName = FileNameIndexUtils + .getDoneFileName(jobIndexInfo); - String jobhistoryDir = JobHistoryUtils - .getHistoryIntermediateDoneDirForUser(conf); - JobHistory jobHistory = new JobHistory(); - jobHistory.init(conf); - - JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) - .getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils - .getDoneFileName(jobIndexInfo); - - Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); - FSDataInputStream in = null; - FileContext fc = null; - try { - fc = FileContext.getFileContext(conf); - in = fc.open(fc.makeQualified(historyFilePath)); - } catch (IOException ioe) { - LOG.info("Can not open history file: " + historyFilePath, ioe); - throw (new Exception("Can not open History File")); - } + Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } - JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); - Exception parseException = parser.getParseException(); - Assert.assertNull("Caught an expected exception " + parseException, - parseException); - for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) { - TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); - CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); - Assert.assertNotNull("completed task report has null counters", - ct.getReport().getCounters()); - //Make sure all the completedTask has counters, and the counters are not empty - Assert.assertTrue(ct.getReport().getCounters() - .getAllCounterGroups().size() > 0); - } + JobHistoryParser parser = new JobHistoryParser(in); + JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + Assert.assertNull("Caught an expected exception " + parseException, + parseException); + for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) { + TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); + CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); + Assert.assertNotNull("completed task report has null counters", ct + .getReport().getCounters()); + } } finally { LOG.info("FINISHED testCountersForFailedTask"); } } - @Test (timeout=50000) + @Test(timeout = 50000) public void testScanningOldDirs() throws Exception { LOG.info("STARTING testScanningOldDirs"); try { - Configuration conf = new Configuration(); - conf - .setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - MyResolver.class, DNSToSwitchMapping.class); - RackResolver.init(conf); - MRApp app = - new MRAppWithHistory(1, 1, true, - this.getClass().getName(), true); - app.submit(conf); - Job job = app.getContext().getAllJobs().values().iterator().next(); - JobId jobId = job.getID(); - LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); - app.waitForState(job, JobState.SUCCEEDED); - - // make sure all events are flushed - app.waitForState(Service.STATE.STOPPED); + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), + true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); + app.waitForState(job, JobState.SUCCEEDED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); + hfm.init(conf); + HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate job history", fileInfo); + + // force the manager to "forget" the job + hfm.deleteJobFromJobListCache(fileInfo); + final int msecPerSleep = 10; + int msecToSleep = 10 * 1000; + while (fileInfo.isMovePending() && msecToSleep > 0) { + Assert.assertTrue(!fileInfo.didMoveFail()); + msecToSleep -= msecPerSleep; + Thread.sleep(msecPerSleep); + } + Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); - HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); - hfm.init(conf); - HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); - Assert.assertNotNull("Unable to locate job history", fileInfo); - - // force the manager to "forget" the job - hfm.deleteJobFromJobListCache(fileInfo); - final int msecPerSleep = 10; - int msecToSleep = 10 * 1000; - while (fileInfo.isMovePending() && msecToSleep > 0) { - Assert.assertTrue(!fileInfo.didMoveFail()); - msecToSleep -= msecPerSleep; - Thread.sleep(msecPerSleep); - } - Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); - - fileInfo = hfm.getFileInfo(jobId); - Assert.assertNotNull("Unable to locate old job history", fileInfo); - } finally { + fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate old job history", fileInfo); + } finally { LOG.info("FINISHED testScanningOldDirs"); } } static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { - public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart) { + public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, + boolean autoComplete, String testName, boolean cleanOnStart) { super(maps, reduces, autoComplete, testName, cleanOnStart); } - + @SuppressWarnings("unchecked") @Override protected void attemptLaunched(TaskAttemptId attemptID) { @@ -558,8 +563,8 @@ public class TestJobHistoryParsing { static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory { - public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart) { + public MRAppWithHistoryWithFailedTask(int maps, int reduces, + boolean autoComplete, String testName, boolean cleanOnStart) { super(maps, reduces, autoComplete, testName, cleanOnStart); } @@ -587,4 +592,133 @@ public class TestJobHistoryParsing { t.testHistoryParsing(); t.testHistoryParsingForFailedAttempts(); } + + /** + * Test clean old history files. Files should be deleted after 1 week by + * default. + */ + @Test(timeout = 15000) + public void testDeleteFileInfo() throws Exception { + LOG.info("STARTING testDeleteFileInfo"); + try { + Configuration conf = new Configuration(); + + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + + RackResolver.init(conf); + MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), + true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + + app.waitForState(job, JobState.SUCCEEDED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + HistoryFileManager hfm = new HistoryFileManager(); + hfm.init(conf); + HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); + hfm.initExisting(); + // wait for move files form the done_intermediate directory to the gone + // directory + while (fileInfo.isMovePending()) { + Thread.sleep(300); + } + + Assert.assertNotNull(hfm.jobListCache.values()); + + // try to remove fileInfo + hfm.clean(); + // check that fileInfo does not deleted + Assert.assertFalse(fileInfo.isDeleted()); + // correct live time + hfm.setMaxHistoryAge(-1); + hfm.clean(); + // should be deleted ! + Assert.assertTrue("file should be deleted ", fileInfo.isDeleted()); + + } finally { + LOG.info("FINISHED testDeleteFileInfo"); + } + } + + /** + * Simple test some methods of JobHistory + */ + @Test(timeout = 20000) + public void testJobHistoryMethods() throws Exception { + LOG.info("STARTING testJobHistoryMethods"); + try { + Configuration configuration = new Configuration(); + configuration + .setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + + RackResolver.init(configuration); + MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), + true); + app.submit(configuration); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); + app.waitForState(job, JobState.SUCCEEDED); + + JobHistory jobHistory = new JobHistory(); + jobHistory.init(configuration); + // Method getAllJobs + Assert.assertEquals(1, jobHistory.getAllJobs().size()); + // and with ApplicationId + Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size()); + + JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default", + 0L, System.currentTimeMillis() + 1, 0L, + System.currentTimeMillis() + 1, JobState.SUCCEEDED); + + Assert.assertEquals(1, jobsinfo.getJobs().size()); + Assert.assertNotNull(jobHistory.getApplicationAttemptId()); + // test Application Id + Assert.assertEquals("application_0_0000", jobHistory.getApplicationID() + .toString()); + Assert + .assertEquals("Job History Server", jobHistory.getApplicationName()); + // method does not work + Assert.assertNull(jobHistory.getEventHandler()); + // method does not work + Assert.assertNull(jobHistory.getClock()); + // method does not work + Assert.assertNull(jobHistory.getClusterInfo()); + + } finally { + LOG.info("FINISHED testJobHistoryMethods"); + } + } + + /** + * Simple test PartialJob + */ + @Test(timeout = 1000) + public void testPartialJob() throws Exception { + JobId jobId = new JobIdPBImpl(); + jobId.setId(0); + JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user", + "jobName", jobId, 3, 2, "JobStatus"); + PartialJob test = new PartialJob(jii, jobId); + + Assert.assertEquals(1.0f, test.getProgress(), 0.001f); + assertNull(test.getAllCounters()); + assertNull(test.getTasks()); + assertNull(test.getTasks(TaskType.MAP)); + assertNull(test.getTask(new TaskIdPBImpl())); + + assertNull(test.getTaskAttemptCompletionEvents(0, 100)); + assertNull(test.getMapAttemptCompletionEvents(0, 100)); + assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null)); + assertNull(test.getAMInfos()); + + } }