Author: shv Date: Fri Jun 4 02:08:30 2010 New Revision: 951238 URL: http://svn.apache.org/viewvc?rev=951238&view=rev Log: MAPREDUCE-1832. Merge -r 951232:951233 from trunk to branch-0.20.
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java Modified: hadoop/common/branches/branch-0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=951238&r1=951237&r2=951238&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20/CHANGES.txt Fri Jun 4 02:08:30 2010 @@ -39,6 +39,8 @@ Release 0.20.3 - Unreleased MAPREDUCE-1407. Update javadoc in mapreduce.{Mapper,Reducer} to match actual usage. (Benoit Sigoure via cdouglas) + MAPREDUCE-1832. Allow file sizes less than 1MB in DFSIO benchmark. (shv) + Release 0.20.2 - 2010-2-19 NEW FEATURES Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?rev=951238&r1=951237&r2=951238&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (original) +++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Fri Jun 4 02:08:30 2010 @@ -22,12 +22,8 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.UTF8; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.*; /** * Reducer that accumulates values based on their type. @@ -45,14 +41,17 @@ import org.apache.hadoop.mapred.Reporter * </ul> * */ +...@suppresswarnings("deprecation") public class AccumulatingReducer extends MapReduceBase - implements Reducer<UTF8, UTF8, UTF8, UTF8> { + implements Reducer<Text, Text, Text, Text> { + static final String VALUE_TYPE_LONG = "l:"; + static final String VALUE_TYPE_FLOAT = "f:"; + static final String VALUE_TYPE_STRING = "s:"; private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class); protected String hostName; public AccumulatingReducer () { - LOG.info("Starting AccumulatingReducer !!!"); try { hostName = java.net.InetAddress.getLocalHost().getHostName(); } catch(Exception e) { @@ -61,9 +60,9 @@ public class AccumulatingReducer extends LOG.info("Starting AccumulatingReducer on " + hostName); } - public void reduce(UTF8 key, - Iterator<UTF8> values, - OutputCollector<UTF8, UTF8> output, + public void reduce(Text key, + Iterator<Text> values, + OutputCollector<Text, Text> output, Reporter reporter ) throws IOException { String field = key.toString(); @@ -71,30 +70,30 @@ public class AccumulatingReducer extends reporter.setStatus("starting " + field + " ::host = " + hostName); // concatenate strings - if (field.startsWith("s:")) { - String sSum = ""; + if (field.startsWith(VALUE_TYPE_STRING)) { + StringBuffer sSum = new StringBuffer(); while (values.hasNext()) - sSum += values.next().toString() + ";"; - output.collect(key, new UTF8(sSum)); + sSum.append(values.next().toString()).append(";"); + output.collect(key, new Text(sSum.toString())); reporter.setStatus("finished " + field + " ::host = " + hostName); return; } // sum long values - if (field.startsWith("f:")) { + if (field.startsWith(VALUE_TYPE_FLOAT)) { float fSum = 0; while (values.hasNext()) fSum += Float.parseFloat(values.next().toString()); - output.collect(key, new UTF8(String.valueOf(fSum))); + output.collect(key, new Text(String.valueOf(fSum))); reporter.setStatus("finished " + field + " ::host = " + hostName); return; } // sum long values - if (field.startsWith("l:")) { + if (field.startsWith(VALUE_TYPE_LONG)) { long lSum = 0; while (values.hasNext()) { lSum += Long.parseLong(values.next().toString()); } - output.collect(key, new UTF8(String.valueOf(lSum))); + output.collect(key, new Text(String.valueOf(lSum))); } reporter.setStatus("finished " + field + " ::host = " + hostName); } Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=951238&r1=951237&r2=951238&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original) +++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java Fri Jun 4 02:08:30 2010 @@ -18,18 +18,26 @@ package org.apache.hadoop.fs; -import java.io.*; - -import junit.framework.TestCase; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; import java.util.Date; import java.util.StringTokenizer; -import org.apache.commons.logging.*; +import junit.framework.TestCase; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.io.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; /** * Distributed i/o benchmark. @@ -60,6 +68,7 @@ import org.apache.hadoop.conf.*; */ public class DFSCIOTest extends TestCase { // Constants + private static final Log LOG = LogFactory.getLog(DFSCIOTest.class); private static final int TEST_TYPE_READ = 0; private static final int TEST_TYPE_WRITE = 1; private static final int TEST_TYPE_CLEANUP = 2; @@ -67,7 +76,6 @@ public class DFSCIOTest extends TestCase private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log"; - private static final Log LOG = FileInputFormat.LOG; private static Configuration fsConfig = new Configuration(); private static final long MEGA = 0x100000; private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest"); @@ -124,9 +132,9 @@ public class DFSCIOTest extends TestCase SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, fsConfig, controlFile, - UTF8.class, LongWritable.class, + Text.class, LongWritable.class, CompressionType.NONE); - writer.append(new UTF8(name), new LongWritable(fileSize)); + writer.append(new Text(name), new LongWritable(fileSize)); } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { @@ -154,26 +162,30 @@ public class DFSCIOTest extends TestCase * <li>i/o rate squared</li> * </ul> */ - private abstract static class IOStatMapper extends IOMapperBase { + private abstract static class IOStatMapper extends IOMapperBase<Long> { IOStatMapper() { - super(fsConfig); } - void collectStats(OutputCollector<UTF8, UTF8> output, + void collectStats(OutputCollector<Text, Text> output, String name, long execTime, - Object objSize) throws IOException { - long totalSize = ((Long)objSize).longValue(); + Long objSize) throws IOException { + long totalSize = objSize.longValue(); float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); LOG.info("Number of bytes processed = " + totalSize); LOG.info("Exec time = " + execTime); LOG.info("IO rate = " + ioRateMbSec); - output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1))); - output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize))); - output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime))); - output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000))); - output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), + new Text(String.valueOf(1))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), + new Text(String.valueOf(totalSize))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), + new Text(String.valueOf(execTime))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), + new Text(String.valueOf(ioRateMbSec*1000))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), + new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); } } @@ -188,7 +200,7 @@ public class DFSCIOTest extends TestCase buffer[i] = (byte)('0' + i % 50); } - public Object doIO(Reporter reporter, + public Long doIO(Reporter reporter, String name, long totalSize ) throws IOException { @@ -274,8 +286,8 @@ public class DFSCIOTest extends TestCase job.setReducerClass(AccumulatingReducer.class); FileOutputFormat.setOutputPath(job, outputDir); - job.setOutputKeyClass(UTF8.class); - job.setOutputValueClass(UTF8.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); JobClient.runJob(job); } @@ -289,7 +301,7 @@ public class DFSCIOTest extends TestCase super(); } - public Object doIO(Reporter reporter, + public Long doIO(Reporter reporter, String name, long totalSize ) throws IOException { Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=951238&r1=951237&r2=951238&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original) +++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Fri Jun 4 02:08:30 2010 @@ -18,20 +18,28 @@ package org.apache.hadoop.fs; -import java.io.*; - -import junit.framework.TestCase; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; import java.util.Date; import java.util.StringTokenizer; import java.util.TreeSet; import java.util.Vector; -import org.apache.commons.logging.*; +import junit.framework.TestCase; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.io.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; /** * Distributed checkup of the file system consistency. @@ -45,6 +53,7 @@ import org.apache.hadoop.conf.*; */ public class DistributedFSCheck extends TestCase { // Constants + private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class); private static final int TEST_TYPE_READ = 0; private static final int TEST_TYPE_CLEANUP = 2; private static final int DEFAULT_BUFFER_SIZE = 1000000; @@ -52,7 +61,6 @@ public class DistributedFSCheck extends private static final long MEGA = 0x100000; private static Configuration fsConfig = new Configuration(); - private static final Log LOG = FileInputFormat.LOG; private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck")); private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input"); private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); @@ -92,7 +100,7 @@ public class DistributedFSCheck extends Path inputFile = new Path(MAP_INPUT_DIR, "in_file"); SequenceFile.Writer writer = SequenceFile.createWriter(fs, fsConfig, inputFile, - UTF8.class, LongWritable.class, CompressionType.NONE); + Text.class, LongWritable.class, CompressionType.NONE); try { nrFiles = 0; @@ -112,7 +120,7 @@ public class DistributedFSCheck extends long blockSize = fs.getDefaultBlockSize(); long fileLength = fs.getLength(rootFile); for(long offset = 0; offset < fileLength; offset += blockSize) - writer.append(new UTF8(rootFile.toString()), new LongWritable(offset)); + writer.append(new Text(rootFile.toString()), new LongWritable(offset)); return; } @@ -126,10 +134,9 @@ public class DistributedFSCheck extends /** * DistributedFSCheck mapper class. */ - public static class DistributedFSCheckMapper extends IOMapperBase { + public static class DistributedFSCheckMapper extends IOMapperBase<Object> { public DistributedFSCheckMapper() { - super(fsConfig); } public Object doIO(Reporter reporter, @@ -163,14 +170,17 @@ public class DistributedFSCheck extends return new Long(actualSize); } - void collectStats(OutputCollector<UTF8, UTF8> output, + void collectStats(OutputCollector<Text, Text> output, String name, long execTime, Object corruptedBlock) throws IOException { - output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"), + new Text(String.valueOf(1))); if (corruptedBlock.getClass().getName().endsWith("String")) { - output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock)); + output.collect( + new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"), + new Text((String)corruptedBlock)); return; } long totalSize = ((Long)corruptedBlock).longValue(); @@ -179,9 +189,12 @@ public class DistributedFSCheck extends LOG.info("Exec time = " + execTime); LOG.info("IO rate = " + ioRateMbSec); - output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize))); - output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime))); - output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), + new Text(String.valueOf(totalSize))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), + new Text(String.valueOf(execTime))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), + new Text(String.valueOf(ioRateMbSec*1000))); } } @@ -195,8 +208,8 @@ public class DistributedFSCheck extends job.setReducerClass(AccumulatingReducer.class); FileOutputFormat.setOutputPath(job, READ_DIR); - job.setOutputKeyClass(UTF8.class); - job.setOutputValueClass(UTF8.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); JobClient.runJob(job); } Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java?rev=951238&r1=951237&r2=951238&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java (original) +++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java Fri Jun 4 02:08:30 2010 @@ -19,16 +19,10 @@ package org.apache.hadoop.fs; import java.io.IOException; import java.net.InetAddress; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.UTF8; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.*; /** * Base mapper class for IO operations. @@ -39,16 +33,20 @@ import org.apache.hadoop.mapred.Reporter * statistics data to be collected by subsequent reducers. * */ -public abstract class IOMapperBase extends Configured - implements Mapper<UTF8, LongWritable, UTF8, UTF8> { +...@suppresswarnings("deprecation") +public abstract class IOMapperBase<T> extends Configured + implements Mapper<Text, LongWritable, Text, Text> { protected byte[] buffer; protected int bufferSize; protected FileSystem fs; protected String hostName; - public IOMapperBase(Configuration conf) { - super(conf); + public IOMapperBase() { + } + + public void configure(JobConf conf) { + setConf(conf); try { fs = FileSystem.get(conf); } catch (Exception e) { @@ -63,10 +61,6 @@ public abstract class IOMapperBase exten } } - public void configure(JobConf job) { - setConf(job); - } - public void close() throws IOException { } @@ -80,7 +74,7 @@ public abstract class IOMapperBase exten * {...@link #collectStats(OutputCollector,String,long,Object)} * @throws IOException */ - abstract Object doIO(Reporter reporter, + abstract T doIO(Reporter reporter, String name, long value) throws IOException; @@ -93,10 +87,10 @@ public abstract class IOMapperBase exten * @param doIOReturnValue value returned by {...@link #doIO(Reporter,String,long)} * @throws IOException */ - abstract void collectStats(OutputCollector<UTF8, UTF8> output, + abstract void collectStats(OutputCollector<Text, Text> output, String name, long execTime, - Object doIOReturnValue) throws IOException; + T doIOReturnValue) throws IOException; /** * Map file name and offset into statistical data. @@ -111,9 +105,9 @@ public abstract class IOMapperBase exten * {...@link #collectStats(OutputCollector,String,long,Object)} * is called to prepare stat data for a subsequent reducer. */ - public void map(UTF8 key, + public void map(Text key, LongWritable value, - OutputCollector<UTF8, UTF8> output, + OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String name = key.toString(); long longValue = value.get(); @@ -121,7 +115,7 @@ public abstract class IOMapperBase exten reporter.setStatus("starting " + name + " ::host = " + hostName); long tStart = System.currentTimeMillis(); - Object statValue = doIO(reporter, name, longValue); + T statValue = doIO(reporter, name, longValue); long tEnd = System.currentTimeMillis(); long execTime = tEnd - tStart; collectStats(output, name, execTime, statValue); Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=951238&r1=951237&r2=951238&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java (original) +++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java Fri Jun 4 02:08:30 2010 @@ -18,19 +18,31 @@ package org.apache.hadoop.fs; -import java.io.*; - -import junit.framework.TestCase; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintStream; import java.util.Date; import java.util.StringTokenizer; -import org.apache.commons.logging.*; +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; /** * Distributed i/o benchmark. @@ -59,23 +71,91 @@ import org.apache.hadoop.conf.*; * <li>standard deviation of i/o rate </li> * </ul> */ -public class TestDFSIO extends TestCase { +public class TestDFSIO extends TestCase implements Tool { // Constants + private static final Log LOG = LogFactory.getLog(TestDFSIO.class); private static final int TEST_TYPE_READ = 0; private static final int TEST_TYPE_WRITE = 1; private static final int TEST_TYPE_CLEANUP = 2; + private static final int TEST_TYPE_APPEND = 3; private static final int DEFAULT_BUFFER_SIZE = 1000000; private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; - - private static final Log LOG = FileInputFormat.LOG; - private static Configuration fsConfig = new Configuration(); - private static final long MEGA = 0x100000; - private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO"); - private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); - private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); - private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); - private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); + private static final long MEGA = ByteMultiple.MB.value(); + private static final String USAGE = + "Usage: " + TestDFSIO.class.getSimpleName() + + " [genericOptions]" + + " -read | -write | -append | -clean [-nrFiles N]" + + " [-fileSize Size[B|KB|MB|GB|TB]]" + + " [-resFile resultFileName] [-bufferSize Bytes]" + + " [-rootDir]"; + + private Configuration config; + + static{ + Configuration.addDefaultResource("hdfs-default.xml"); + Configuration.addDefaultResource("hdfs-site.xml"); + Configuration.addDefaultResource("mapred-default.xml"); + Configuration.addDefaultResource("mapred-site.xml"); + } + + static enum ByteMultiple { + B(1L), + KB(0x400L), + MB(0x100000L), + GB(0x40000000L), + TB(0x10000000000L); + + private long multiplier; + + private ByteMultiple(long mult) { + multiplier = mult; + } + + long value() { + return multiplier; + } + + static ByteMultiple parseString(String sMultiple) { + if(sMultiple == null || sMultiple.isEmpty()) // MB by default + return MB; + String sMU = sMultiple.toUpperCase(); + if(B.name().toUpperCase().endsWith(sMU)) + return B; + if(KB.name().toUpperCase().endsWith(sMU)) + return KB; + if(MB.name().toUpperCase().endsWith(sMU)) + return MB; + if(GB.name().toUpperCase().endsWith(sMU)) + return GB; + if(TB.name().toUpperCase().endsWith(sMU)) + return TB; + throw new IllegalArgumentException("Unsupported ByteMultiple "+sMultiple); + } + } + + public TestDFSIO() { + this.config = new Configuration(); + } + + private static String getBaseDir(Configuration conf) { + return conf.get("test.build.data","/benchmarks/TestDFSIO"); + } + private static Path getControlDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_control"); + } + private static Path getWriteDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_write"); + } + private static Path getReadDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_read"); + } + private static Path getAppendDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_append"); + } + private static Path getDataDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_data"); + } /** * Run the test with default parameters. @@ -83,7 +163,8 @@ public class TestDFSIO extends TestCase * @throws Exception */ public void testIOs() throws Exception { - testIOs(10, 10); + TestDFSIO bench = new TestDFSIO(); + bench.testIOs(1, 4); } /** @@ -93,35 +174,54 @@ public class TestDFSIO extends TestCase * @param nrFiles number of files * @throws IOException */ - public static void testIOs(int fileSize, int nrFiles) + public void testIOs(int fileSize, int nrFiles) throws IOException { + config.setBoolean("dfs.support.append", true); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster(config, 2, true, null); + FileSystem fs = cluster.getFileSystem(); - FileSystem fs = FileSystem.get(fsConfig); + createControlFile(fs, fileSize, nrFiles); + long tStart = System.currentTimeMillis(); + writeTest(fs); + long execTime = System.currentTimeMillis() - tStart; + analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME); - createControlFile(fs, fileSize, nrFiles); - writeTest(fs); - readTest(fs); - cleanup(fs); + tStart = System.currentTimeMillis(); + readTest(fs); + execTime = System.currentTimeMillis() - tStart; + analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME); + + tStart = System.currentTimeMillis(); + appendTest(fs); + execTime = System.currentTimeMillis() - tStart; + analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME); + + cleanup(fs); + } finally { + if(cluster != null) cluster.shutdown(); + } } - private static void createControlFile( - FileSystem fs, - int fileSize, // in MB - int nrFiles - ) throws IOException { - LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); + private void createControlFile(FileSystem fs, + long fileSize, // in bytes + int nrFiles + ) throws IOException { + LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files"); - fs.delete(CONTROL_DIR, true); + Path controlDir = getControlDir(config); + fs.delete(controlDir, true); for(int i=0; i < nrFiles; i++) { String name = getFileName(i); - Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); + Path controlFile = new Path(controlDir, "in_file_" + name); SequenceFile.Writer writer = null; try { - writer = SequenceFile.createWriter(fs, fsConfig, controlFile, - UTF8.class, LongWritable.class, + writer = SequenceFile.createWriter(fs, config, controlFile, + Text.class, LongWritable.class, CompressionType.NONE); - writer.append(new UTF8(name), new LongWritable(fileSize)); + writer.append(new Text(name), new LongWritable(fileSize)); } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { @@ -149,48 +249,51 @@ public class TestDFSIO extends TestCase * <li>i/o rate squared</li> * </ul> */ - private abstract static class IOStatMapper extends IOMapperBase { + private abstract static class IOStatMapper<T> extends IOMapperBase<T> { IOStatMapper() { - super(fsConfig); } - void collectStats(OutputCollector<UTF8, UTF8> output, + void collectStats(OutputCollector<Text, Text> output, String name, long execTime, - Object objSize) throws IOException { - long totalSize = ((Long)objSize).longValue(); + Long objSize) throws IOException { + long totalSize = objSize.longValue(); float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); LOG.info("Number of bytes processed = " + totalSize); LOG.info("Exec time = " + execTime); LOG.info("IO rate = " + ioRateMbSec); - output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1))); - output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize))); - output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime))); - output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000))); - output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), + new Text(String.valueOf(1))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), + new Text(String.valueOf(totalSize))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), + new Text(String.valueOf(execTime))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), + new Text(String.valueOf(ioRateMbSec*1000))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), + new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); } } /** * Write mapper class. */ - public static class WriteMapper extends IOStatMapper { + public static class WriteMapper extends IOStatMapper<Long> { public WriteMapper() { - super(); for(int i=0; i < bufferSize; i++) buffer[i] = (byte)('0' + i % 50); } - public Object doIO(Reporter reporter, + @Override + public Long doIO(Reporter reporter, String name, - long totalSize - ) throws IOException { + long totalSize // in bytes + ) throws IOException { // create file - totalSize *= MEGA; OutputStream out; - out = fs.create(new Path(DATA_DIR, name), true, bufferSize); + out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); try { // write to the file @@ -205,57 +308,98 @@ public class TestDFSIO extends TestCase } finally { out.close(); } - return new Long(totalSize); + return Long.valueOf(totalSize); } } - private static void writeTest(FileSystem fs) - throws IOException { - - fs.delete(DATA_DIR, true); - fs.delete(WRITE_DIR, true); + private void writeTest(FileSystem fs) throws IOException { + Path writeDir = getWriteDir(config); + fs.delete(getDataDir(config), true); + fs.delete(writeDir, true); - runIOTest(WriteMapper.class, WRITE_DIR); + runIOTest(WriteMapper.class, writeDir); } - private static void runIOTest( Class<? extends Mapper> mapperClass, - Path outputDir - ) throws IOException { - JobConf job = new JobConf(fsConfig, TestDFSIO.class); + @SuppressWarnings("deprecation") + private void runIOTest( + Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, + Path outputDir) throws IOException { + JobConf job = new JobConf(config, TestDFSIO.class); - FileInputFormat.setInputPaths(job, CONTROL_DIR); + FileInputFormat.setInputPaths(job, getControlDir(config)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(mapperClass); job.setReducerClass(AccumulatingReducer.class); FileOutputFormat.setOutputPath(job, outputDir); - job.setOutputKeyClass(UTF8.class); - job.setOutputValueClass(UTF8.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); JobClient.runJob(job); } /** + * Append mapper class. + */ + public static class AppendMapper extends IOStatMapper<Long> { + + public AppendMapper() { + for(int i=0; i < bufferSize; i++) + buffer[i] = (byte)('0' + i % 50); + } + + public Long doIO(Reporter reporter, + String name, + long totalSize // in bytes + ) throws IOException { + // create file + OutputStream out; + out = fs.append(new Path(getDataDir(getConf()), name), bufferSize); + + try { + // write to the file + long nrRemaining; + for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; + out.write(buffer, 0, curSize); + reporter.setStatus("writing " + name + "@" + + (totalSize - nrRemaining) + "/" + totalSize + + " ::host = " + hostName); + } + } finally { + out.close(); + } + return Long.valueOf(totalSize); + } + } + + private void appendTest(FileSystem fs) throws IOException { + Path appendDir = getAppendDir(config); + fs.delete(appendDir, true); + runIOTest(AppendMapper.class, appendDir); + } + + /** * Read mapper class. */ - public static class ReadMapper extends IOStatMapper { + public static class ReadMapper extends IOStatMapper<Long> { public ReadMapper() { - super(); } - public Object doIO(Reporter reporter, + public Long doIO(Reporter reporter, String name, - long totalSize - ) throws IOException { - totalSize *= MEGA; + long totalSize // in bytes + ) throws IOException { // open file - DataInputStream in = fs.open(new Path(DATA_DIR, name)); + DataInputStream in = fs.open(new Path(getDataDir(getConf()), name)); + long actualSize = 0; try { - long actualSize = 0; - for(int curSize = bufferSize; curSize == bufferSize;) { + for(int curSize = bufferSize; + curSize == bufferSize && actualSize < totalSize;) { curSize = in.read(buffer, 0, bufferSize); + if(curSize < 0) break; actualSize += curSize; reporter.setStatus("reading " + name + "@" + actualSize + "/" + totalSize @@ -264,55 +408,73 @@ public class TestDFSIO extends TestCase } finally { in.close(); } - return new Long(totalSize); + return Long.valueOf(actualSize); } } - private static void readTest(FileSystem fs) throws IOException { - fs.delete(READ_DIR, true); - runIOTest(ReadMapper.class, READ_DIR); + private void readTest(FileSystem fs) throws IOException { + Path readDir = getReadDir(config); + fs.delete(readDir, true); + runIOTest(ReadMapper.class, readDir); } - private static void sequentialTest( - FileSystem fs, - int testType, - int fileSize, - int nrFiles - ) throws Exception { - IOStatMapper ioer = null; + private void sequentialTest(FileSystem fs, + int testType, + long fileSize, // in bytes + int nrFiles + ) throws IOException { + IOStatMapper<Long> ioer = null; if (testType == TEST_TYPE_READ) ioer = new ReadMapper(); else if (testType == TEST_TYPE_WRITE) ioer = new WriteMapper(); + else if (testType == TEST_TYPE_APPEND) + ioer = new AppendMapper(); else return; for(int i=0; i < nrFiles; i++) ioer.doIO(Reporter.NULL, BASE_FILE_NAME+Integer.toString(i), - MEGA*fileSize); + fileSize); } public static void main(String[] args) { + TestDFSIO bench = new TestDFSIO(); + int res = -1; + try { + res = ToolRunner.run(bench, args); + } catch(Exception e) { + System.err.print(StringUtils.stringifyException(e)); + res = -2; + } + if(res == -1) + System.err.print(USAGE); + System.exit(res); + } + + @Override // Tool + public int run(String[] args) throws IOException { int testType = TEST_TYPE_READ; int bufferSize = DEFAULT_BUFFER_SIZE; - int fileSize = 1; + long fileSize = 1*MEGA; int nrFiles = 1; String resFileName = DEFAULT_RES_FILE_NAME; boolean isSequential = false; + String version = TestDFSIO.class.getSimpleName() + ".0.0.6"; - String version="TestFDSIO.0.0.4"; - String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; - - System.out.println(version); + LOG.info(version); if (args.length == 0) { - System.err.println(usage); - System.exit(-1); + System.err.println("Missing arguments."); + return -1; } + for (int i = 0; i < args.length; i++) { // parse command line if (args[i].startsWith("-read")) { testType = TEST_TYPE_READ; } else if (args[i].equals("-write")) { testType = TEST_TYPE_WRITE; + } else if (args[i].equals("-append")) { + testType = TEST_TYPE_APPEND; } else if (args[i].equals("-clean")) { testType = TEST_TYPE_CLEANUP; } else if (args[i].startsWith("-seq")) { @@ -320,83 +482,120 @@ public class TestDFSIO extends TestCase } else if (args[i].equals("-nrFiles")) { nrFiles = Integer.parseInt(args[++i]); } else if (args[i].equals("-fileSize")) { - fileSize = Integer.parseInt(args[++i]); + fileSize = parseSize(args[++i]); } else if (args[i].equals("-bufferSize")) { bufferSize = Integer.parseInt(args[++i]); } else if (args[i].equals("-resFile")) { resFileName = args[++i]; + } else { + System.err.println("Illegal argument: " + args[i]); + return -1; } } LOG.info("nrFiles = " + nrFiles); - LOG.info("fileSize (MB) = " + fileSize); + LOG.info("fileSize (MB) = " + toMB(fileSize)); LOG.info("bufferSize = " + bufferSize); - - try { - fsConfig.setInt("test.io.file.buffer.size", bufferSize); - FileSystem fs = FileSystem.get(fsConfig); + LOG.info("baseDir = " + getBaseDir(config)); - if (isSequential) { - long tStart = System.currentTimeMillis(); - sequentialTest(fs, testType, fileSize, nrFiles); - long execTime = System.currentTimeMillis() - tStart; - String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; - LOG.info(resultLine); - return; - } - if (testType == TEST_TYPE_CLEANUP) { - cleanup(fs); - return; - } - createControlFile(fs, fileSize, nrFiles); + config.setInt("test.io.file.buffer.size", bufferSize); + config.setBoolean("dfs.support.append", true); + FileSystem fs = FileSystem.get(config); + + if (isSequential) { long tStart = System.currentTimeMillis(); - if (testType == TEST_TYPE_WRITE) - writeTest(fs); - if (testType == TEST_TYPE_READ) - readTest(fs); + sequentialTest(fs, testType, fileSize, nrFiles); long execTime = System.currentTimeMillis() - tStart; - - analyzeResult(fs, testType, execTime, resFileName); - } catch(Exception e) { - System.err.print(StringUtils.stringifyException(e)); - System.exit(-1); + String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; + LOG.info(resultLine); + return 0; + } + if (testType == TEST_TYPE_CLEANUP) { + cleanup(fs); + return 0; } - } + createControlFile(fs, fileSize, nrFiles); + long tStart = System.currentTimeMillis(); + if (testType == TEST_TYPE_WRITE) + writeTest(fs); + if (testType == TEST_TYPE_READ) + readTest(fs); + if (testType == TEST_TYPE_APPEND) + appendTest(fs); + long execTime = System.currentTimeMillis() - tStart; - private static void analyzeResult( FileSystem fs, - int testType, - long execTime, - String resFileName - ) throws IOException { + analyzeResult(fs, testType, execTime, resFileName); + return 0; + } + + @Override // Configurable + public Configuration getConf() { + return this.config; + } + + @Override // Configurable + public void setConf(Configuration conf) { + this.config = conf; + } + + /** + * Returns size in bytes. + * + * @param arg = {d}[B|KB|MB|GB|TB] + * @return + */ + static long parseSize(String arg) { + String[] args = arg.split("\\D", 2); // get digits + assert args.length <= 2; + long fileSize = Long.parseLong(args[0]); + String bytesMult = arg.substring(args[0].length()); // get byte multiple + return fileSize * ByteMultiple.parseString(bytesMult).value(); + } + + static float toMB(long bytes) { + return ((float)bytes)/MEGA; + } + + private void analyzeResult( FileSystem fs, + int testType, + long execTime, + String resFileName + ) throws IOException { Path reduceFile; if (testType == TEST_TYPE_WRITE) - reduceFile = new Path(WRITE_DIR, "part-00000"); - else - reduceFile = new Path(READ_DIR, "part-00000"); - DataInputStream in; - in = new DataInputStream(fs.open(reduceFile)); - - BufferedReader lines; - lines = new BufferedReader(new InputStreamReader(in)); + reduceFile = new Path(getWriteDir(config), "part-00000"); + else if (testType == TEST_TYPE_APPEND) + reduceFile = new Path(getAppendDir(config), "part-00000"); + else // if (testType == TEST_TYPE_READ) + reduceFile = new Path(getReadDir(config), "part-00000"); long tasks = 0; long size = 0; long time = 0; float rate = 0; float sqrate = 0; - String line; - while((line = lines.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); - String attr = tokens.nextToken(); - if (attr.endsWith(":tasks")) - tasks = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":size")) - size = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":time")) - time = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":rate")) - rate = Float.parseFloat(tokens.nextToken()); - else if (attr.endsWith(":sqrate")) - sqrate = Float.parseFloat(tokens.nextToken()); + DataInputStream in = null; + BufferedReader lines = null; + try { + in = new DataInputStream(fs.open(reduceFile)); + lines = new BufferedReader(new InputStreamReader(in)); + String line; + while((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); + String attr = tokens.nextToken(); + if (attr.endsWith(":tasks")) + tasks = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":size")) + size = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":time")) + time = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":rate")) + rate = Float.parseFloat(tokens.nextToken()); + else if (attr.endsWith(":sqrate")) + sqrate = Float.parseFloat(tokens.nextToken()); + } + } finally { + if(in != null) in.close(); + if(lines != null) lines.close(); } double med = rate / 1000 / tasks; @@ -404,27 +603,32 @@ public class TestDFSIO extends TestCase String resultLines[] = { "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : (testType == TEST_TYPE_READ) ? "read" : + (testType == TEST_TYPE_APPEND) ? "append" : "unknown"), " Date & time: " + new Date(System.currentTimeMillis()), " Number of files: " + tasks, - "Total MBytes processed: " + size/MEGA, + "Total MBytes processed: " + toMB(size), " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), "Average IO rate mb/sec: " + med, " IO rate std deviation: " + stdDev, " Test exec time sec: " + (float)execTime / 1000, "" }; - PrintStream res = new PrintStream( - new FileOutputStream( - new File(resFileName), true)); - for(int i = 0; i < resultLines.length; i++) { - LOG.info(resultLines[i]); - res.println(resultLines[i]); + PrintStream res = null; + try { + res = new PrintStream(new FileOutputStream(new File(resFileName), true)); + for(int i = 0; i < resultLines.length; i++) { + LOG.info(resultLines[i]); + res.println(resultLines[i]); + } + } finally { + if(res != null) res.close(); } } - private static void cleanup(FileSystem fs) throws IOException { + private void cleanup(FileSystem fs) + throws IOException { LOG.info("Cleaning up test files"); - fs.delete(new Path(TEST_ROOT_DIR), true); + fs.delete(new Path(getBaseDir(config)), true); } }