Author: amarrk Date: Mon Jun 6 05:40:38 2011 New Revision: 1132529 URL: http://svn.apache.org/viewvc?rev=1132529&view=rev Log: MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to TraceBuilder's output. (amarrk)
Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz (with props) hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1132529&r1=1132528&r2=1132529&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jun 6 05:40:38 2011 @@ -25,6 +25,9 @@ Trunk (unreleased changes) IMPROVEMENTS + MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to + TraceBuilder's output. (amarrk) + MAPREDUCE-2554. [Gridmix] Add distributed cache emulation system tests to Gridmix. (Vinay Kumar Thota via amarrk) Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1132529&r1=1132528&r2=1132529&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Mon Jun 6 05:40:38 2011 @@ -33,11 +33,13 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MapReduceTestUtil; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat; @@ -230,7 +232,7 @@ public class TestRumenJobTraces { parser = new Hadoop20JHParser(ris); ArrayList<String> seenEvents = new ArrayList<String>(150); - getHistoryEvents(parser, seenEvents); // get events into seenEvents + getHistoryEvents(parser, seenEvents, null); // get events into seenEvents // Validate the events seen by history parser from // history file v20-single-input-log.gz @@ -512,7 +514,11 @@ public class TestRumenJobTraces { // Test if the JobHistoryParserFactory can detect the parser correctly parser = JobHistoryParserFactory.getParser(ris); - getHistoryEvents(parser, seenEvents); // get events into seenEvents + // create a job builder + JobBuilder builder = new JobBuilder(id.toString()); + + // get events into seenEvents and also process them using builder + getHistoryEvents(parser, seenEvents, builder); // Check against the gold standard System.out.println("testCurrentJHParser validating using gold std "); @@ -523,6 +529,26 @@ public class TestRumenJobTraces { }; validateSeenHistoryEvents(seenEvents, goldLinesExpected); + + // validate resource usage metrics + // get the job counters + Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters(); + + // get the logged job + LoggedJob loggedJob = builder.build(); + // get the logged attempts + LoggedTaskAttempt attempt = + loggedJob.getMapTasks().get(0).getAttempts().get(0); + // get the resource usage metrics + ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics(); + + // check with the actual values + testResourceUsageMetricViaDeepCompare(metrics, + counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue(), + counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue(), + counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(), + counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(), + true); } finally { // stop the MR cluster mrCluster.shutdown(); @@ -687,6 +713,141 @@ public class TestRumenJobTraces { } + /** + * Test {@link ResourceUsageMetrics}. + */ + @Test + public void testResourceUsageMetrics() throws Exception { + final long cpuUsage = 100; + final long pMemUsage = 200; + final long vMemUsage = 300; + final long heapUsage = 400; + + // test ResourceUsageMetrics's setters + ResourceUsageMetrics metrics = new ResourceUsageMetrics(); + metrics.setCumulativeCpuUsage(cpuUsage); + metrics.setPhysicalMemoryUsage(pMemUsage); + metrics.setVirtualMemoryUsage(vMemUsage); + metrics.setHeapUsage(heapUsage); + // test cpu usage value + assertEquals("Cpu usage values mismatch via set", cpuUsage, + metrics.getCumulativeCpuUsage()); + // test pMem usage value + assertEquals("Physical memory usage values mismatch via set", pMemUsage, + metrics.getPhysicalMemoryUsage()); + // test vMem usage value + assertEquals("Virtual memory usage values mismatch via set", vMemUsage, + metrics.getVirtualMemoryUsage()); + // test heap usage value + assertEquals("Heap usage values mismatch via set", heapUsage, + metrics.getHeapUsage()); + + // test deepCompare() (pass case) + testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, + pMemUsage, heapUsage, true); + + // test deepCompare (fail case) + // test cpu usage mismatch + testResourceUsageMetricViaDeepCompare(metrics, 0, vMemUsage, pMemUsage, + heapUsage, false); + // test pMem usage mismatch + testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 0, + heapUsage, false); + // test vMem usage mismatch + testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, 0, pMemUsage, + heapUsage, false); + // test heap usage mismatch + testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, + pMemUsage, 0, false); + + // define a metric with a fixed value of size() + ResourceUsageMetrics metrics2 = new ResourceUsageMetrics() { + @Override + public int size() { + return -1; + } + }; + metrics2.setCumulativeCpuUsage(cpuUsage); + metrics2.setPhysicalMemoryUsage(pMemUsage); + metrics2.setVirtualMemoryUsage(vMemUsage); + metrics2.setHeapUsage(heapUsage); + + // test with size mismatch + testResourceUsageMetricViaDeepCompare(metrics2, cpuUsage, vMemUsage, + pMemUsage, heapUsage, false); + } + + // test ResourceUsageMetric's deepCompare() method + private static void testResourceUsageMetricViaDeepCompare( + ResourceUsageMetrics metrics, long cpuUsage, + long vMemUsage, long pMemUsage, long heapUsage, + boolean shouldPass) { + ResourceUsageMetrics testMetrics = new ResourceUsageMetrics(); + testMetrics.setCumulativeCpuUsage(cpuUsage); + testMetrics.setPhysicalMemoryUsage(pMemUsage); + testMetrics.setVirtualMemoryUsage(vMemUsage); + testMetrics.setHeapUsage(heapUsage); + + Boolean passed = null; + try { + metrics.deepCompare(testMetrics, new TreePath(null, "<root>")); + passed = true; + } catch (DeepInequalityException die) { + passed = false; + } + + assertEquals("ResourceUsageMetrics deepCompare() failed!", + shouldPass, passed); + } + + /** + * Testing {@link ResourceUsageMetrics} using {@link HadoopLogsAnalyzer}. + */ + @Test + @SuppressWarnings("deprecation") + public void testResourceUsageMetricsWithHadoopLogsAnalyzer() + throws IOException { + Configuration conf = new Configuration(); + // get the input trace file + Path rootInputDir = + new Path(System.getProperty("test.tools.input.dir", "")); + Path rootInputSubFolder = new Path(rootInputDir, "rumen/small-trace-test"); + Path traceFile = new Path(rootInputSubFolder, "v20-resource-usage-log.gz"); + + FileSystem lfs = FileSystem.getLocal(conf); + + // define the root test directory + Path rootTempDir = + new Path(System.getProperty("test.build.data", "/tmp")); + + // define output directory + Path outputDir = + new Path(rootTempDir, "testResourceUsageMetricsWithHadoopLogsAnalyzer"); + lfs.delete(outputDir, true); + lfs.deleteOnExit(outputDir); + + // run HadoopLogsAnalyzer + HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer(); + analyzer.setConf(conf); + Path traceOutput = new Path(outputDir, "trace.json"); + analyzer.run(new String[] {"-write-job-trace", traceOutput.toString(), + "-v1", traceFile.toString()}); + + // test HadoopLogsAnalyzer's output w.r.t ResourceUsageMetrics + // get the logged job + JsonObjectMapperParser<LoggedJob> traceParser = + new JsonObjectMapperParser<LoggedJob>(traceOutput, LoggedJob.class, + conf); + + // get the logged job from the output trace file + LoggedJob job = traceParser.getNext(); + LoggedTaskAttempt attempt = job.getMapTasks().get(0).getAttempts().get(0); + ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics(); + + // test via deepCompare() + testResourceUsageMetricViaDeepCompare(metrics, 200, 100, 75, 50, true); + } + @Test public void testTopologyBuilder() throws Exception { final TopologyBuilder subject = new TopologyBuilder(); @@ -795,12 +956,15 @@ public class TestRumenJobTraces { * @throws IOException */ private void getHistoryEvents(JobHistoryParser parser, - ArrayList<String> events) throws IOException { + ArrayList<String> events, JobBuilder builder) throws IOException { HistoryEvent e; while ((e = parser.nextEvent()) != null) { String eventString = e.getClass().getSimpleName(); System.out.println(eventString); events.add(eventString); + if (builder != null) { + builder.process(e); + } } } Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz?rev=1132529&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=1132529&r1=1132528&r2=1132529&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Mon Jun 6 05:40:38 2011 @@ -1208,6 +1208,38 @@ public class HadoopLogsAnalyzer extends attempt.spilledRecords = val; } }, counterString, "SPILLED_RECORDS"); + + // incorporate CPU usage + incorporateCounter(new SetField(attempt2) { + @Override + void set(long val) { + attempt.getResourceUsageMetrics().setCumulativeCpuUsage(val); + } + }, counterString, "CPU_MILLISECONDS"); + + // incorporate virtual memory usage + incorporateCounter(new SetField(attempt2) { + @Override + void set(long val) { + attempt.getResourceUsageMetrics().setVirtualMemoryUsage(val); + } + }, counterString, "VIRTUAL_MEMORY_BYTES"); + + // incorporate physical memory usage + incorporateCounter(new SetField(attempt2) { + @Override + void set(long val) { + attempt.getResourceUsageMetrics().setPhysicalMemoryUsage(val); + } + }, counterString, "PHYSICAL_MEMORY_BYTES"); + + // incorporate heap usage + incorporateCounter(new SetField(attempt2) { + @Override + void set(long val) { + attempt.getResourceUsageMetrics().setHeapUsage(val); + } + }, counterString, "COMMITTED_HEAP_BYTES"); } private ParsedHost getAndRecordParsedHost(String hostName) { Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1132529&r1=1132528&r2=1132529&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Mon Jun 6 05:40:38 2011 @@ -28,7 +28,6 @@ import org.codehaus.jackson.annotate.Jso // the Jackson implementation of JSON doesn't handle a // superclass-valued field. -import org.apache.hadoop.mapreduce.jobhistory.Events; import org.apache.hadoop.mapreduce.jobhistory.JhCounter; import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup; import org.apache.hadoop.mapreduce.jobhistory.JhCounters; @@ -69,6 +68,9 @@ public class LoggedTaskAttempt implement LoggedLocation location; + // Initialize to default object for backward compatibility + ResourceUsageMetrics metrics = new ResourceUsageMetrics(); + LoggedTaskAttempt() { super(); } @@ -354,8 +356,50 @@ public class LoggedTaskAttempt implement attempt.spilledRecords = val; } }, counters, "SPILLED_RECORDS"); + + // incorporate CPU usage + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + metrics.setCumulativeCpuUsage(val); + } + }, counters, "CPU_MILLISECONDS"); + + // incorporate virtual memory usage + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + metrics.setVirtualMemoryUsage(val); + } + }, counters, "VIRTUAL_MEMORY_BYTES"); + + // incorporate physical memory usage + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + metrics.setPhysicalMemoryUsage(val); + } + }, counters, "PHYSICAL_MEMORY_BYTES"); + + // incorporate heap usage + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + metrics.setHeapUsage(val); + } + }, counters, "COMMITTED_HEAP_BYTES"); } + // Get the resource usage metrics + public ResourceUsageMetrics getResourceUsageMetrics() { + return metrics; + } + + // Set the resource usage metrics + void setResourceUsageMetrics(ResourceUsageMetrics metrics) { + this.metrics = metrics; + } + private static String canonicalizeCounterName(String nonCanonicalName) { String result = nonCanonicalName.toLowerCase(); Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java?rev=1132529&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java Mon Jun 6 05:40:38 2011 @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Captures the resource usage metrics. + */ +public class ResourceUsageMetrics implements Writable, DeepCompare { + private long cumulativeCpuUsage; + private long virtualMemoryUsage; + private long physicalMemoryUsage; + private long heapUsage; + + public ResourceUsageMetrics() { + } + + /** + * Get the cumulative CPU usage. + */ + public long getCumulativeCpuUsage() { + return cumulativeCpuUsage; + } + + /** + * Set the cumulative CPU usage. + */ + public void setCumulativeCpuUsage(long usage) { + cumulativeCpuUsage = usage; + } + + /** + * Get the virtual memory usage. + */ + public long getVirtualMemoryUsage() { + return virtualMemoryUsage; + } + + /** + * Set the virtual memory usage. + */ + public void setVirtualMemoryUsage(long usage) { + virtualMemoryUsage = usage; + } + + /** + * Get the physical memory usage. + */ + public long getPhysicalMemoryUsage() { + return physicalMemoryUsage; + } + + /** + * Set the physical memory usage. + */ + public void setPhysicalMemoryUsage(long usage) { + physicalMemoryUsage = usage; + } + + /** + * Get the total heap usage. + */ + public long getHeapUsage() { + return heapUsage; + } + + /** + * Set the total heap usage. + */ + public void setHeapUsage(long usage) { + heapUsage = usage; + } + + /** + * Returns the size of the serialized data + */ + public int size() { + int size = 0; + size += WritableUtils.getVIntSize(cumulativeCpuUsage); // long #1 + size += WritableUtils.getVIntSize(virtualMemoryUsage); // long #2 + size += WritableUtils.getVIntSize(physicalMemoryUsage); // long #3 + size += WritableUtils.getVIntSize(heapUsage); // long #4 + return size; + } + + @Override + public void readFields(DataInput in) throws IOException { + cumulativeCpuUsage = WritableUtils.readVLong(in); // long #1 + virtualMemoryUsage = WritableUtils.readVLong(in); // long #2 + physicalMemoryUsage = WritableUtils.readVLong(in); // long #3 + heapUsage = WritableUtils.readVLong(in); // long #4 + } + + @Override + public void write(DataOutput out) throws IOException { + //TODO Write resources version no too + WritableUtils.writeVLong(out, cumulativeCpuUsage); // long #1 + WritableUtils.writeVLong(out, virtualMemoryUsage); // long #2 + WritableUtils.writeVLong(out, physicalMemoryUsage); // long #3 + WritableUtils.writeVLong(out, heapUsage); // long #4 + } + + private static void compareMetric(long m1, long m2, TreePath loc) + throws DeepInequalityException { + if (m1 != m2) { + throw new DeepInequalityException("Value miscompared:" + loc.toString(), + loc); + } + } + + private static void compareSize(ResourceUsageMetrics m1, + ResourceUsageMetrics m2, TreePath loc) + throws DeepInequalityException { + if (m1.size() != m2.size()) { + throw new DeepInequalityException("Size miscompared: " + loc.toString(), + loc); + } + } + + @Override + public void deepCompare(DeepCompare other, TreePath loc) + throws DeepInequalityException { + if (!(other instanceof ResourceUsageMetrics)) { + throw new DeepInequalityException("Comparand has wrong type", loc); + } + + ResourceUsageMetrics metrics2 = (ResourceUsageMetrics) other; + compareMetric(getCumulativeCpuUsage(), metrics2.getCumulativeCpuUsage(), + new TreePath(loc, "cumulativeCpu")); + compareMetric(getVirtualMemoryUsage(), metrics2.getVirtualMemoryUsage(), + new TreePath(loc, "virtualMemory")); + compareMetric(getPhysicalMemoryUsage(), metrics2.getPhysicalMemoryUsage(), + new TreePath(loc, "physicalMemory")); + compareMetric(getHeapUsage(), metrics2.getHeapUsage(), + new TreePath(loc, "heapUsage")); + compareSize(this, metrics2, new TreePath(loc, "size")); + } +} + Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=1132529&r1=1132528&r2=1132529&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Mon Jun 6 05:40:38 2011 @@ -23,14 +23,22 @@ public class TaskInfo { private final long bytesOut; private final int recsOut; private final long maxMemory; + private final ResourceUsageMetrics metrics; public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, long maxMemory) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, + new ResourceUsageMetrics()); + } + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory, ResourceUsageMetrics metrics) { this.bytesIn = bytesIn; this.recsIn = recsIn; this.bytesOut = bytesOut; this.recsOut = recsOut; this.maxMemory = maxMemory; + this.metrics = metrics; } /** @@ -70,4 +78,10 @@ public class TaskInfo { return maxMemory; } + /** + * @return Resource usage metrics + */ + public ResourceUsageMetrics getResourceUsageMetrics() { + return metrics; + } } Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1132529&r1=1132528&r2=1132529&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Mon Jun 6 05:40:38 2011 @@ -636,6 +636,7 @@ public class ZombieJob implements JobSto long outputBytes = -1; long outputRecords = -1; long heapMegabytes = -1; + ResourceUsageMetrics metrics = new ResourceUsageMetrics(); Values type = loggedTask.getTaskType(); if ((type != Values.MAP) && (type != Values.REDUCE)) { @@ -670,12 +671,15 @@ public class ZombieJob implements JobSto (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job .getHeapMegabytes(); } + // set the resource usage metrics + metrics = attempt.getResourceUsageMetrics(); break; } TaskInfo taskInfo = new TaskInfo(inputBytes, (int) inputRecords, outputBytes, - (int) outputRecords, (int) heapMegabytes); + (int) outputRecords, (int) heapMegabytes, + metrics); return taskInfo; }