Author: amarrk Date: Fri Jul 8 17:53:36 2011 New Revision: 1144403 URL: http://svn.apache.org/viewvc?rev=1144403&view=rev Log: MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk)
Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java Modified: hadoop/common/trunk/mapreduce/CHANGES.txt hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Modified: hadoop/common/trunk/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1144403&r1=1144402&r2=1144403&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/CHANGES.txt (original) +++ hadoop/common/trunk/mapreduce/CHANGES.txt Fri Jul 8 17:53:36 2011 @@ -35,7 +35,9 @@ Trunk (unreleased changes) IMPROVEMENTS - MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to + MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk) + + MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to Gridmix. (Vinay Kumar Thota via amarrk) MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java?rev=1144403&view=auto ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java (added) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java Fri Jul 8 17:53:36 2011 @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobTracker; +import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; + +/** + * Summarizes the Hadoop cluster used in this {@link Gridmix} run. + * Statistics that are reported are + * <ul> + * <li>Total number of active trackers in the cluster</li> + * <li>Total number of blacklisted trackers in the cluster</li> + * <li>Max map task capacity of the cluster</li> + * <li>Max reduce task capacity of the cluster</li> + * </ul> + * + * Apart from these statistics, {@link JobTracker} and {@link FileSystem} + * addresses are also recorded in the summary. + */ +class ClusterSummarizer implements StatListener<ClusterStats> { + static final Log LOG = LogFactory.getLog(ClusterSummarizer.class); + + private int numBlacklistedTrackers; + private int numActiveTrackers; + private int maxMapTasks; + private int maxReduceTasks; + private String jobTrackerInfo = Summarizer.NA; + private String namenodeInfo = Summarizer.NA; + + @Override + @SuppressWarnings("deprecation") + public void update(ClusterStats item) { + try { + numBlacklistedTrackers = item.getStatus().getBlacklistedTrackers(); + numActiveTrackers = item.getStatus().getTaskTrackers(); + maxMapTasks = item.getStatus().getMaxMapTasks(); + maxReduceTasks = item.getStatus().getMaxReduceTasks(); + } catch (Exception e) { + long time = System.currentTimeMillis(); + LOG.info("Error in processing cluster status at " + + FastDateFormat.getInstance().format(time)); + } + } + + /** + * Summarizes the cluster used for this {@link Gridmix} run. + */ + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("Cluster Summary:-"); + builder.append("\nJobTracker: ").append(getJobTrackerInfo()); + builder.append("\nFileSystem: ").append(getNamenodeInfo()); + builder.append("\nNumber of blacklisted trackers: ") + .append(getNumBlacklistedTrackers()); + builder.append("\nNumber of active trackers: ") + .append(getNumActiveTrackers()); + builder.append("\nMax map task capacity: ") + .append(getMaxMapTasks()); + builder.append("\nMax reduce task capacity: ").append(getMaxReduceTasks()); + builder.append("\n\n"); + return builder.toString(); + } + + void start(Configuration conf) { + jobTrackerInfo = conf.get(JTConfig.JT_IPC_ADDRESS); + namenodeInfo = conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY); + } + + // Getters + protected int getNumBlacklistedTrackers() { + return numBlacklistedTrackers; + } + + protected int getNumActiveTrackers() { + return numActiveTrackers; + } + + protected int getMaxMapTasks() { + return maxMapTasks; + } + + protected int getMaxReduceTasks() { + return maxReduceTasks; + } + + protected String getJobTrackerInfo() { + return jobTrackerInfo; + } + + protected String getNamenodeInfo() { + return namenodeInfo; + } +} \ No newline at end of file Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java?rev=1144403&r1=1144402&r2=1144403&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java (original) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java Fri Jul 8 17:53:36 2011 @@ -43,6 +43,7 @@ import org.apache.hadoop.io.compress.Dec import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Utils; +import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; import org.apache.hadoop.mapred.gridmix.GenerateData.GenDataFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -307,8 +308,8 @@ class CompressionEmulationUtil { * <li>Random text word size</li> * </ul> */ - static void publishCompressedDataStatistics(Path inputDir, Configuration conf, - long uncompressedDataSize) + static DataStatistics publishCompressedDataStatistics(Path inputDir, + Configuration conf, long uncompressedDataSize) throws IOException { FileSystem fs = inputDir.getFileSystem(conf); CompressionCodecFactory compressionCodecs = @@ -356,6 +357,8 @@ class CompressionEmulationUtil { // publish the compression ratio LOG.info("Input Data Compression Ratio : " + ratio); } + + return new DataStatistics(compressedDataSize, numCompressedFiles, true); } /** Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java?rev=1144403&view=auto ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java (added) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java Fri Jul 8 17:53:36 2011 @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; + +/** + * Summarizes a {@link Gridmix} run. Statistics that are reported are + * <ul> + * <li>Total number of jobs in the input trace</li> + * <li>Trace signature</li> + * <li>Total number of jobs processed from the input trace</li> + * <li>Total number of jobs submitted</li> + * <li>Total number of successful and failed jobs</li> + * <li>Total number of map/reduce tasks launched</li> + * <li>Gridmix start & end time</li> + * <li>Total time for the Gridmix run (data-generation and simulation)</li> + * <li>Gridmix Configuration (i.e job-type, submission-type, resolver)</li> + * </ul> + */ +class ExecutionSummarizer implements StatListener<JobStats> { + static final Log LOG = LogFactory.getLog(ExecutionSummarizer.class); + private static final FastDateFormat UTIL = FastDateFormat.getInstance(); + + private int numJobsInInputTrace; + private int totalSuccessfulJobs; + private int totalFailedJobs; + private int totalMapTasksLaunched; + private int totalReduceTasksLaunched; + private long totalSimulationTime; + private long totalRuntime; + private final String commandLineArgs; + private long startTime; + private long endTime; + private long simulationStartTime; + private String inputTraceLocation; + private String inputTraceSignature; + private String jobSubmissionPolicy; + private String resolver; + private DataStatistics dataStats; + private String expectedDataSize; + + /** + * Basic constructor initialized with the runtime arguments. + */ + ExecutionSummarizer(String[] args) { + startTime = System.currentTimeMillis(); + // flatten the args string and store it + commandLineArgs = + org.apache.commons.lang.StringUtils.join(args, ' '); + } + + /** + * Default constructor. + */ + ExecutionSummarizer() { + startTime = System.currentTimeMillis(); + commandLineArgs = Summarizer.NA; + } + + void start(Configuration conf) { + simulationStartTime = System.currentTimeMillis(); + } + + private void processJobState(JobStats stats) throws Exception { + Job job = stats.getJob(); + if (job.isSuccessful()) { + ++totalSuccessfulJobs; + } else { + ++totalFailedJobs; + } + } + + private void processJobTasks(JobStats stats) throws Exception { + totalMapTasksLaunched += stats.getNoOfMaps(); + Job job = stats.getJob(); + totalReduceTasksLaunched += job.getNumReduceTasks(); + } + + private void process(JobStats stats) { + try { + // process the job run state + processJobState(stats); + + // process the tasks information + processJobTasks(stats); + } catch (Exception e) { + LOG.info("Error in processing job " + stats.getJob().getJobID() + "."); + } + } + + @Override + public void update(JobStats item) { + // process only if the simulation has started + if (simulationStartTime > 0) { + process(item); + totalSimulationTime = + System.currentTimeMillis() - getSimulationStartTime(); + } + } + + // Generates a signature for the trace file based on + // - filename + // - modification time + // - file length + // - owner + protected static String getTraceSignature(String input) throws IOException { + Path inputPath = new Path(input); + FileSystem fs = inputPath.getFileSystem(new Configuration()); + FileStatus status = fs.getFileStatus(inputPath); + Path qPath = fs.makeQualified(status.getPath()); + String traceID = status.getModificationTime() + qPath.toString() + + status.getOwner() + status.getLen(); + return MD5Hash.digest(traceID).toString(); + } + + @SuppressWarnings("unchecked") + void finalize(JobFactory factory, String inputPath, long dataSize, + UserResolver userResolver, DataStatistics stats, + Configuration conf) + throws IOException { + numJobsInInputTrace = factory.numJobsInTrace; + endTime = System.currentTimeMillis(); + Path inputTracePath = new Path(inputPath); + FileSystem fs = inputTracePath.getFileSystem(conf); + inputTraceLocation = fs.makeQualified(inputTracePath).toString(); + inputTraceSignature = getTraceSignature(inputTraceLocation); + jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name(); + resolver = userResolver.getClass().getName(); + if (dataSize > 0) { + expectedDataSize = StringUtils.humanReadableInt(dataSize); + } else { + expectedDataSize = Summarizer.NA; + } + dataStats = stats; + totalRuntime = System.currentTimeMillis() - getStartTime(); + } + + /** + * Summarizes the current {@link Gridmix} run. + */ + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("Execution Summary:-"); + builder.append("\nInput trace: ").append(getInputTraceLocation()); + builder.append("\nInput trace signature: ") + .append(getInputTraceSignature()); + builder.append("\nTotal number of jobs in trace: ") + .append(getNumJobsInTrace()); + builder.append("\nExpected input data size: ") + .append(getExpectedDataSize()); + builder.append("\nInput data statistics: ") + .append(getInputDataStatistics()); + builder.append("\nTotal number of jobs processed: ") + .append(getNumSubmittedJobs()); + builder.append("\nTotal number of successful jobs: ") + .append(getNumSuccessfulJobs()); + builder.append("\nTotal number of failed jobs: ") + .append(getNumFailedJobs()); + builder.append("\nTotal number of map tasks launched: ") + .append(getNumMapTasksLaunched()); + builder.append("\nTotal number of reduce task launched: ") + .append(getNumReduceTasksLaunched()); + builder.append("\nGridmix start time: ") + .append(UTIL.format(getStartTime())); + builder.append("\nGridmix end time: ").append(UTIL.format(getEndTime())); + builder.append("\nGridmix simulation start time: ") + .append(UTIL.format(getStartTime())); + builder.append("\nGridmix runtime: ") + .append(StringUtils.formatTime(getRuntime())); + builder.append("\nTime spent in initialization (data-gen etc): ") + .append(StringUtils.formatTime(getInitTime())); + builder.append("\nTime spent in simulation: ") + .append(StringUtils.formatTime(getSimulationTime())); + builder.append("\nGridmix configuration parameters: ") + .append(getCommandLineArgsString()); + builder.append("\nGridmix job submission policy: ") + .append(getJobSubmissionPolicy()); + builder.append("\nGridmix resolver: ").append(getUserResolver()); + builder.append("\n\n"); + return builder.toString(); + } + + // Gets the stringified version of DataStatistics + static String stringifyDataStatistics(DataStatistics stats) { + if (stats != null) { + StringBuffer buffer = new StringBuffer(); + String compressionStatus = stats.isDataCompressed() + ? "Compressed" + : "Uncompressed"; + buffer.append(compressionStatus).append(" input data size: "); + buffer.append(StringUtils.humanReadableInt(stats.getDataSize())); + buffer.append(", "); + buffer.append("Number of files: ").append(stats.getNumFiles()); + + return buffer.toString(); + } else { + return Summarizer.NA; + } + } + + // Getters + protected String getExpectedDataSize() { + return expectedDataSize; + } + + protected String getUserResolver() { + return resolver; + } + + protected String getInputDataStatistics() { + return stringifyDataStatistics(dataStats); + } + + protected String getInputTraceSignature() { + return inputTraceSignature; + } + + protected String getInputTraceLocation() { + return inputTraceLocation; + } + + protected int getNumJobsInTrace() { + return numJobsInInputTrace; + } + + protected int getNumSuccessfulJobs() { + return totalSuccessfulJobs; + } + + protected int getNumFailedJobs() { + return totalFailedJobs; + } + + protected int getNumSubmittedJobs() { + return totalSuccessfulJobs + totalFailedJobs; + } + + protected int getNumMapTasksLaunched() { + return totalMapTasksLaunched; + } + + protected int getNumReduceTasksLaunched() { + return totalReduceTasksLaunched; + } + + protected long getStartTime() { + return startTime; + } + + protected long getEndTime() { + return endTime; + } + + protected long getInitTime() { + return simulationStartTime - startTime; + } + + protected long getSimulationStartTime() { + return simulationStartTime; + } + + protected long getSimulationTime() { + return totalSimulationTime; + } + + protected long getRuntime() { + return totalRuntime; + } + + protected String getCommandLineArgsString() { + return commandLineArgs; + } + + protected String getJobSubmissionPolicy() { + return jobSubmissionPolicy; + } +} \ No newline at end of file Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1144403&r1=1144402&r2=1144403&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Jul 8 17:53:36 2011 @@ -100,20 +100,48 @@ class GenerateData extends GridmixJob { } /** + * Represents the input data characteristics. + */ + static class DataStatistics { + private long dataSize; + private long numFiles; + private boolean isDataCompressed; + + DataStatistics(long dataSize, long numFiles, boolean isCompressed) { + this.dataSize = dataSize; + this.numFiles = numFiles; + this.isDataCompressed = isCompressed; + } + + long getDataSize() { + return dataSize; + } + + long getNumFiles() { + return numFiles; + } + + boolean isDataCompressed() { + return isDataCompressed; + } + } + + /** * Publish the data statistics. */ - static void publishDataStatistics(Path inputDir, long genBytes, - Configuration conf) + static DataStatistics publishDataStatistics(Path inputDir, long genBytes, + Configuration conf) throws IOException { if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) { - CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, - conf, genBytes); + return CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, + conf, genBytes); } else { - publishPlainDataStatistics(conf, inputDir); + return publishPlainDataStatistics(conf, inputDir); } } - static void publishPlainDataStatistics(Configuration conf, Path inputDir) + static DataStatistics publishPlainDataStatistics(Configuration conf, + Path inputDir) throws IOException { FileSystem fs = inputDir.getFileSystem(conf); @@ -134,6 +162,8 @@ class GenerateData extends GridmixJob { LOG.info("Total size of input data : " + StringUtils.humanReadableInt(dataSize)); LOG.info("Total number of input data files : " + fileCount); + + return new DataStatistics(dataSize, fileCount, false); } @Override Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1144403&r1=1144402&r2=1144403&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Jul 8 17:53:36 2011 @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -113,10 +114,19 @@ public class Gridmix extends Configured private JobSubmitter submitter; private JobMonitor monitor; private Statistics statistics; + private Summarizer summarizer; // Shutdown hook private final Shutdown sdh = new Shutdown(); + Gridmix(String[] args) { + summarizer = new Summarizer(args); + } + + Gridmix() { + summarizer = new Summarizer(); + } + // Get the input data directory for Gridmix. Input directory is // <io-path>/input static Path getGridmixInputDataPath(Path ioPath) { @@ -205,6 +215,13 @@ public class Gridmix extends Configured return new ZombieJobProducer(new Path(traceIn), null, conf); } + // get the gridmix job submission policy + protected static GridmixJobSubmissionPolicy getJobSubmissionPolicy( + Configuration conf) { + return GridmixJobSubmissionPolicy.getPolicy(conf, + GridmixJobSubmissionPolicy.STRESS); + } + /** * Create each component in the pipeline and start it. * @param conf Configuration data, no keys specific to this context @@ -221,8 +238,7 @@ public class Gridmix extends Configured throws IOException { try { Path inputDir = getGridmixInputDataPath(ioPath); - GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy( - conf, GridmixJobSubmissionPolicy.STRESS); + GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf); LOG.info(" Submission policy is " + policy.name()); statistics = new Statistics(conf, policy.getPollingInterval(), startFlag); monitor = createJobMonitor(statistics); @@ -248,6 +264,10 @@ public class Gridmix extends Configured statistics.addClusterStatsObservers(factory); } + // add the gridmix run summarizer to the statistics + statistics.addJobStatsListeners(summarizer.getExecutionSummarizer()); + statistics.addClusterStatsObservers(summarizer.getClusterSummarizer()); + monitor.start(); submitter.start(); }catch(Exception e) { @@ -293,6 +313,10 @@ public class Gridmix extends Configured return runJob(conf, argv); } }); + + // print the run summary + System.out.print("\n\n"); + System.out.println(summarizer.toString()); return val; } @@ -373,6 +397,7 @@ public class Gridmix extends Configured int start(Configuration conf, String traceIn, Path ioPath, long genbytes, UserResolver userResolver, boolean generate) throws IOException, InterruptedException { + DataStatistics stats = null; InputStream trace = null; ioPath = ioPath.makeQualified(ioPath.getFileSystem(conf)); @@ -395,7 +420,7 @@ public class Gridmix extends Configured } // publish the data statistics - GenerateData.publishDataStatistics(inputDir, genbytes, conf); + stats = GenerateData.publishDataStatistics(inputDir, genbytes, conf); // scan input dir contents submitter.refreshFilePool(); @@ -407,6 +432,9 @@ public class Gridmix extends Configured return exitCode; } + // start the summarizer + summarizer.start(conf); + factory.start(); statistics.start(); } catch (Throwable e) { @@ -436,6 +464,10 @@ public class Gridmix extends Configured } } finally { + if (factory != null) { + summarizer.finalize(factory, traceIn, genbytes, userResolver, stats, + conf); + } IOUtils.cleanup(LOG, trace); } return 0; @@ -567,7 +599,7 @@ public class Gridmix extends Configured public static void main(String[] argv) throws Exception { int res = -1; try { - res = ToolRunner.run(new Configuration(), new Gridmix(), argv); + res = ToolRunner.run(new Configuration(), new Gridmix(argv), argv); } finally { System.exit(res); } Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1144403&r1=1144402&r2=1144403&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Jul 8 17:53:36 2011 @@ -28,7 +28,6 @@ import org.apache.hadoop.mapreduce.TaskT import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStoryProducer; import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; -import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.tools.rumen.TaskInfo; import org.apache.hadoop.tools.rumen.ZombieJobProducer; @@ -64,6 +63,7 @@ abstract class JobFactory<T> implements protected volatile IOException error = null; protected final JobStoryProducer jobProducer; protected final ReentrantLock lock = new ReentrantLock(true); + protected int numJobsInTrace = 0; /** * Creating a new instance does not start the thread. @@ -168,13 +168,33 @@ abstract class JobFactory<T> implements protected abstract Thread createReaderThread() ; + // gets the next job from the trace and does some bookkeeping for the same + private JobStory getNextJobFromTrace() throws IOException { + JobStory story = jobProducer.getNextJob(); + if (story != null) { + ++numJobsInTrace; + } + return story; + } + protected JobStory getNextJobFiltered() throws IOException { - JobStory job; - do { - job = jobProducer.getNextJob(); - } while (job != null && + JobStory job = getNextJobFromTrace(); + while (job != null && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS || - job.getSubmissionTime() < 0)); + job.getSubmissionTime() < 0)) { + if (LOG.isDebugEnabled()) { + String reason = null; + if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) { + reason = "STATE (" + job.getOutcome().name() + ") "; + } + if (job.getSubmissionTime() < 0) { + reason += "SUBMISSION-TIME (" + job.getSubmissionTime() + ")"; + } + LOG.debug("Ignoring job " + job.getJobID() + " from the input trace." + + " Reason: " + reason == null ? "N/A" : reason); + } + job = getNextJobFromTrace(); + } return null == job ? null : new FilterJobStory(job) { @Override public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=1144403&r1=1144402&r2=1144403&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Jul 8 17:53:36 2011 @@ -78,13 +78,13 @@ class JobMonitor implements Gridmix.Comp } /** - * Add a submission failed job , such tht it can be communicated + * Add a submission failed job , such that it can be communicated * back to serial. * TODO: Cleaner solution for this problem * @param job */ public void submissionFailed(Job job) { - LOG.info(" Job submission failed notify if anyone is waiting " + job); + LOG.info("Job submission failed notification for job " + job.getJobID()); this.statistics.add(job); } Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1144403&r1=1144402&r2=1144403&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Jul 8 17:53:36 2011 @@ -126,7 +126,7 @@ class JobSubmitter implements Gridmix.Co monitor.submissionFailed(job.getJob()); } catch(Exception e) { //Due to some exception job wasnt submitted. - LOG.info(" Job " + job.getJob() + " submission failed " , e); + LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e); monitor.submissionFailed(job.getJob()); } finally { sem.release(); Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java?rev=1144403&view=auto ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java (added) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java Fri Jul 8 17:53:36 2011 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; + +/** + * Summarizes various aspects of a {@link Gridmix} run. + */ +class Summarizer { + private ExecutionSummarizer executionSummarizer; + private ClusterSummarizer clusterSummarizer; + protected static final String NA = "N/A"; + + Summarizer() { + this(new String[]{NA}); + } + + Summarizer(String[] args) { + executionSummarizer = new ExecutionSummarizer(args); + clusterSummarizer = new ClusterSummarizer(); + } + + ExecutionSummarizer getExecutionSummarizer() { + return executionSummarizer; + } + + ClusterSummarizer getClusterSummarizer() { + return clusterSummarizer; + } + + void start(Configuration conf) { + executionSummarizer.start(conf); + clusterSummarizer.start(conf); + } + + /** + * This finalizes the summarizer. + */ + @SuppressWarnings("unchecked") + void finalize(JobFactory factory, String path, long size, + UserResolver resolver, DataStatistics stats, Configuration conf) + throws IOException { + executionSummarizer.finalize(factory, path, size, resolver, stats, conf); + } + + /** + * Summarizes the current {@link Gridmix} run and the cluster used. + */ + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(executionSummarizer.toString()); + builder.append(clusterSummarizer.toString()); + return builder.toString(); + } +} \ No newline at end of file Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java?rev=1144403&view=auto ============================================================================== --- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java (added) +++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java Fri Jul 8 17:53:36 2011 @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.UtilsForTests; +import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; +import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.junit.Test; + +/** + * Test {@link ExecutionSummarizer} and {@link ClusterSummarizer}. + */ +public class TestGridmixSummary { + + /** + * Test {@link DataStatistics}. + */ + @Test + public void testDataStatistics() throws Exception { + // test data-statistics getters with compression enabled + DataStatistics stats = new DataStatistics(10, 2, true); + assertEquals("Data size mismatch", 10, stats.getDataSize()); + assertEquals("Num files mismatch", 2, stats.getNumFiles()); + assertTrue("Compression configuration mismatch", stats.isDataCompressed()); + + // test data-statistics getters with compression disabled + stats = new DataStatistics(100, 5, false); + assertEquals("Data size mismatch", 100, stats.getDataSize()); + assertEquals("Num files mismatch", 5, stats.getNumFiles()); + assertFalse("Compression configuration mismatch", stats.isDataCompressed()); + + // test publish data stats + Configuration conf = new Configuration(); + Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp")); + Path testDir = new Path(rootTempDir, "testDataStatistics"); + FileSystem fs = testDir.getFileSystem(conf); + fs.delete(testDir, true); + Path testInputDir = new Path(testDir, "test"); + fs.mkdirs(testInputDir); + + // test empty folder (compression = true) + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + Boolean failed = null; + try { + GenerateData.publishDataStatistics(testInputDir, 1024L, conf); + failed = false; + } catch (RuntimeException e) { + failed = true; + } + assertNotNull("Expected failure!", failed); + assertTrue("Compression data publishing error", failed); + + // test with empty folder (compression = off) + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false); + stats = GenerateData.publishDataStatistics(testInputDir, 1024L, conf); + assertEquals("Data size mismatch", 0, stats.getDataSize()); + assertEquals("Num files mismatch", 0, stats.getNumFiles()); + assertFalse("Compression configuration mismatch", stats.isDataCompressed()); + + // test with some plain input data (compression = off) + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false); + Path inputDataFile = new Path(testInputDir, "test"); + long size = + UtilsForTests.createTmpFileDFS(fs, inputDataFile, + FsPermission.createImmutable((short)777), "hi hello bye").size(); + stats = GenerateData.publishDataStatistics(testInputDir, -1, conf); + assertEquals("Data size mismatch", size, stats.getDataSize()); + assertEquals("Num files mismatch", 1, stats.getNumFiles()); + assertFalse("Compression configuration mismatch", stats.isDataCompressed()); + + // test with some plain input data (compression = on) + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + failed = null; + try { + GenerateData.publishDataStatistics(testInputDir, 1234L, conf); + failed = false; + } catch (RuntimeException e) { + failed = true; + } + assertNotNull("Expected failure!", failed); + assertTrue("Compression data publishing error", failed); + + // test with some compressed input data (compression = off) + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false); + fs.delete(inputDataFile, false); + inputDataFile = new Path(testInputDir, "test.gz"); + size = + UtilsForTests.createTmpFileDFS(fs, inputDataFile, + FsPermission.createImmutable((short)777), "hi hello").size(); + stats = GenerateData.publishDataStatistics(testInputDir, 1234L, conf); + assertEquals("Data size mismatch", size, stats.getDataSize()); + assertEquals("Num files mismatch", 1, stats.getNumFiles()); + assertFalse("Compression configuration mismatch", stats.isDataCompressed()); + + // test with some compressed input data (compression = on) + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + stats = GenerateData.publishDataStatistics(testInputDir, 1234L, conf); + assertEquals("Data size mismatch", size, stats.getDataSize()); + assertEquals("Num files mismatch", 1, stats.getNumFiles()); + assertTrue("Compression configuration mismatch", stats.isDataCompressed()); + } + + /** + * A fake {@link JobFactory}. + */ + @SuppressWarnings("unchecked") + private static class FakeJobFactory extends JobFactory { + /** + * A fake {@link JobStoryProducer} for {@link FakeJobFactory}. + */ + private static class FakeJobStoryProducer implements JobStoryProducer { + @Override + public void close() throws IOException { + } + + @Override + public JobStory getNextJob() throws IOException { + return null; + } + } + + FakeJobFactory(Configuration conf) { + super(null, new FakeJobStoryProducer(), null, conf, null, null); + } + + @Override + public void update(Object item) { + } + + @Override + protected Thread createReaderThread() { + return null; + } + } + + /** + * Test {@link ExecutionSummarizer}. + */ + @Test + @SuppressWarnings("unchecked") + public void testExecutionSummarizer() throws IOException { + Configuration conf = new Configuration(); + + ExecutionSummarizer es = new ExecutionSummarizer(); + assertEquals("ExecutionSummarizer init failed", + Summarizer.NA, es.getCommandLineArgsString()); + + long startTime = System.currentTimeMillis(); + // test configuration parameters + String[] initArgs = new String[] {"-Xmx20m", "-Dtest.args='test'"}; + es = new ExecutionSummarizer(initArgs); + + assertEquals("ExecutionSummarizer init failed", + "-Xmx20m -Dtest.args='test'", + es.getCommandLineArgsString()); + + // test start time + assertTrue("Start time mismatch", es.getStartTime() >= startTime); + assertTrue("Start time mismatch", + es.getStartTime() <= System.currentTimeMillis()); + + // test start() of ExecutionSummarizer + es.update(null); + assertEquals("ExecutionSummarizer init failed", 0, + es.getSimulationStartTime()); + testExecutionSummarizer(0, 0, 0, 0, 0, 0, es); + + long simStartTime = System.currentTimeMillis(); + es.start(null); + assertTrue("Simulation start time mismatch", + es.getSimulationStartTime() >= simStartTime); + assertTrue("Simulation start time mismatch", + es.getSimulationStartTime() <= System.currentTimeMillis()); + + // test with job stats + JobStats stats = generateFakeJobStats(1, 10, true); + es.update(stats); + testExecutionSummarizer(1, 10, 0, 1, 1, 0, es); + + // test with failed job + stats = generateFakeJobStats(5, 1, false); + es.update(stats); + testExecutionSummarizer(6, 11, 0, 2, 1, 1, es); + + // test finalize + // define a fake job factory + JobFactory factory = new FakeJobFactory(conf); + + // fake the num jobs in trace + factory.numJobsInTrace = 3; + + Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp")); + Path testDir = new Path(rootTempDir, "testGridmixSummary"); + Path testTraceFile = new Path(testDir, "test-trace.json"); + FileSystem fs = FileSystem.getLocal(conf); + fs.create(testTraceFile).close(); + + // finalize the summarizer + UserResolver resolver = new RoundRobinUserResolver(); + DataStatistics dataStats = new DataStatistics(100, 2, true); + String policy = GridmixJobSubmissionPolicy.REPLAY.name(); + conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy); + es.finalize(factory, testTraceFile.toString(), 1024L, resolver, dataStats, + conf); + + // test num jobs in trace + assertEquals("Mismtach in num jobs in trace", 3, es.getNumJobsInTrace()); + + // test trace signature + String tid = + ExecutionSummarizer.getTraceSignature(testTraceFile.toString()); + assertEquals("Mismatch in trace signature", + tid, es.getInputTraceSignature()); + // test trace location + Path qPath = fs.makeQualified(testTraceFile); + assertEquals("Mismatch in trace signature", + qPath.toString(), es.getInputTraceLocation()); + // test expected data size + assertEquals("Mismatch in expected data size", + "1.0k", es.getExpectedDataSize()); + // test input data statistics + assertEquals("Mismatch in input data statistics", + ExecutionSummarizer.stringifyDataStatistics(dataStats), + es.getInputDataStatistics()); + // test user resolver + assertEquals("Mismatch in user resolver", + resolver.getClass().getName(), es.getUserResolver()); + // test policy + assertEquals("Mismatch in policy", policy, es.getJobSubmissionPolicy()); + + // test data stringification using large data + es.finalize(factory, testTraceFile.toString(), 1024*1024*1024*10L, resolver, + dataStats, conf); + assertEquals("Mismatch in expected data size", + "10.0g", es.getExpectedDataSize()); + + // test trace signature uniqueness + // touch the trace file + fs.delete(testTraceFile, false); + // sleep for 1 sec + try { + Thread.sleep(1000); + } catch (InterruptedException ie) {} + fs.create(testTraceFile).close(); + es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats, + conf); + // test missing expected data size + assertEquals("Mismatch in trace signature", + Summarizer.NA, es.getExpectedDataSize()); + assertFalse("Mismatch in trace signature", + tid.equals(es.getInputTraceSignature())); + // get the new identifier + tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString()); + assertEquals("Mismatch in trace signature", + tid, es.getInputTraceSignature()); + + testTraceFile = new Path(testDir, "test-trace2.json"); + fs.create(testTraceFile).close(); + es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats, + conf); + assertFalse("Mismatch in trace signature", + tid.equals(es.getInputTraceSignature())); + // get the new identifier + tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString()); + assertEquals("Mismatch in trace signature", + tid, es.getInputTraceSignature()); + + } + + // test the ExecutionSummarizer + private static void testExecutionSummarizer(int numMaps, int numReds, + int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob, + int numFailedJobs, ExecutionSummarizer es) { + assertEquals("ExecutionSummarizer test failed [num-maps]", + numMaps, es.getNumMapTasksLaunched()); + assertEquals("ExecutionSummarizer test failed [num-reducers]", + numReds, es.getNumReduceTasksLaunched()); + assertEquals("ExecutionSummarizer test failed [num-jobs-in-trace]", + totalJobsInTrace, es.getNumJobsInTrace()); + assertEquals("ExecutionSummarizer test failed [num-submitted jobs]", + totalJobSubmitted, es.getNumSubmittedJobs()); + assertEquals("ExecutionSummarizer test failed [num-successful-jobs]", + numSuccessfulJob, es.getNumSuccessfulJobs()); + assertEquals("ExecutionSummarizer test failed [num-failed jobs]", + numFailedJobs, es.getNumFailedJobs()); + } + + // generate fake job stats + @SuppressWarnings("deprecation") + private static JobStats generateFakeJobStats(final int numMaps, + final int numReds, final boolean isSuccessful) + throws IOException { + // A fake job + Job fakeJob = new Job() { + @Override + public int getNumReduceTasks() { + return numReds; + }; + + @Override + public boolean isSuccessful() throws IOException, InterruptedException { + return isSuccessful; + }; + }; + return new JobStats(numMaps, fakeJob); + } + + /** + * Test {@link ClusterSummarizer}. + */ + @Test + @SuppressWarnings("deprecation") + public void testClusterSummarizer() throws IOException { + ClusterSummarizer cs = new ClusterSummarizer(); + Configuration conf = new Configuration(); + + String jt = "test-jt:1234"; + String nn = "test-nn:5678"; + conf.set(JTConfig.JT_IPC_ADDRESS, jt); + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn); + cs.start(conf); + + assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo()); + assertEquals("NN name mismatch", nn, cs.getNamenodeInfo()); + + ClusterStats cstats = ClusterStats.getClusterStats(); + conf.set(JTConfig.JT_IPC_ADDRESS, "local"); + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local"); + JobClient jc = new JobClient(conf); + cstats.setClusterMetric(jc.getClusterStatus()); + + cs.update(cstats); + + // test + assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks()); + assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks()); + assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers()); + assertEquals("Cluster summary test failed!", 0, + cs.getNumBlacklistedTrackers()); + } +} \ No newline at end of file