I changed the following code in my test code ,it's ok FileInputFormat. *addInputPath*(job, *new* Path(args[0])); =>
MyFileInputFormat. *addInputPath*(job, *new* Path(args[0])); On Tue, Dec 24, 2013 at 9:27 AM, ch huang <[email protected]> wrote: > hi,maillist: > i try to understand how the FileInputFormat work,and i do the > following things,but it seems not work,and i do not know why,hope anyone > can shed light on it > > i change the wordcount code add one line on run function , and > MyTextInputFormat just a copy of TextInputFormat > > job.setInputFormatClass(MyTextInputFormat.class); > > and i also modify the MyTextInputFormat.java , and MyFileInputFormat just > another copy from FileInputFormat,so i can modify it freely and observe > > public class MyTextInputFormat extends FileInputFormat<LongWritable, > Text> => public class MyTextInputFormat extends > MyFileInputFormat<LongWritable, Text> > > finally i compile and run the program ,but get error like this, i do not > know why > > # hadoop yarndemo/BmAlphaToNum /alex/messages /alex/output8 > 13/12/24 09:10:15 WARN conf.Configuration: session.id is deprecated. > Instead, use dfs.metrics.session-id > 13/12/24 09:10:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with > processName=JobTracker, sessionId= > 13/12/24 09:10:15 WARN mapred.JobClient: Use GenericOptionsParser for > parsing the arguments. Applications should implement Tool for the same. > 13/12/24 09:10:15 WARN mapred.JobClient: No job jar file set. User > classes may not be found. See JobConf(Class) or JobConf#setJar(String). > 13/12/24 09:10:15 INFO yarndemo.MyFileInputFormat: in getsplits > 13/12/24 09:10:15 INFO yarndemo.MyFileInputFormat: jumpppppppppp into > liststatus!! > 13/12/24 09:10:15 INFO mapred.JobClient: Cleaning up the staging area > file:/data/temp/mapred/staging/root866865048/.staging/job_local866865048_0001 > 13/12/24 09:10:15 ERROR security.UserGroupInformation: > PriviledgedActionException as:root (auth:SIMPLE) cause:java.io.IOException: > No input paths specified in job > Exception in thread "main" java.io.IOException: No input paths specified > in job > at > yarndemo.MyFileInputFormat.listStatus(MyFileInputFormat.java:214) > at yarndemo.MyFileInputFormat.getSplits(MyFileInputFormat.java:284) > at > org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1063) > at > org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1080) > at > org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174) > at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:992) > at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:945) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) > at > org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:945) > at org.apache.hadoop.mapreduce.Job.submit(Job.java:566) > at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:596) > at yarndemo.BmAlphaToNum.run(BmAlphaToNum.java:74) > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) > at yarndemo.BmAlphaToNum.main(BmAlphaToNum.java:78) > > here is my java file content > > package yarndemo; > import java.io.IOException; > import java.util.StringTokenizer; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.IntWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.Mapper; > import org.apache.hadoop.mapreduce.Reducer; > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > import org.apache.hadoop.util.Tool; > import org.apache.hadoop.util.ToolRunner; > public class BmAlphaToNum extends Configured implements Tool { > public static class TokenizerMapper extends > Mapper<Object, Text, Text, IntWritable> { > private final static IntWritable one = new IntWritable(1); > private Text word = new Text(); > public void map(Object key, Text value, Context context) > throws IOException, InterruptedException { > StringTokenizer itr = new > StringTokenizer(value.toString()); > while (itr.hasMoreTokens()) { > word.set(itr.nextToken()); > context.write(word, one); > } > } > } > public static class IntSumReducer extends > Reducer<Text, IntWritable, Text, IntWritable> { > private IntWritable result = new IntWritable(); > public void reduce(Text key, Iterable<IntWritable> values, > Context context) throws IOException, > InterruptedException { > int sum = 0; > for (IntWritable val : values) { > sum += val.get(); > } > result.set(sum); > context.write(key, result); > } > } > private static void usage() throws IOException { > System.err.println("teragen <num rows> <output dir>"); > } > public int run(String[] args) throws IOException, > InterruptedException, > ClassNotFoundException { > Job job = Job.getInstance(getConf()); > if (args.length != 2) { > usage(); > return 2; > } > job.setJobName("wordcount"); > job.setJarByClass(BmAlphaToNum.class); > job.setMapperClass(TokenizerMapper.class); > job.setCombinerClass(IntSumReducer.class); > job.setReducerClass(IntSumReducer.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(IntWritable.class); > job.setInputFormatClass(MyTextInputFormat.class); > FileInputFormat.addInputPath(job, new Path(args[0])); > FileOutputFormat.setOutputPath(job, new Path(args[1])); > return job.waitForCompletion(true) ? 0 : 1; > } > public static void main(String[] args) throws Exception { > int res = ToolRunner.run(new Configuration(), new > BmAlphaToNum(), args); > System.exit(res); > } > } > > package yarndemo; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.flume.annotations.InterfaceAudience; > import org.apache.flume.annotations.InterfaceStability; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.io.compress.CompressionCodec; > import org.apache.hadoop.io.compress.CompressionCodecFactory; > import org.apache.hadoop.io.compress.SplittableCompressionCodec; > import org.apache.hadoop.mapreduce.InputFormat; > import org.apache.hadoop.mapreduce.InputSplit; > import org.apache.hadoop.mapreduce.JobContext; > import org.apache.hadoop.mapreduce.RecordReader; > import org.apache.hadoop.mapreduce.TaskAttemptContext; > import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; > import org.python.google.common.base.Charsets; > /** > * An {@link InputFormat} for plain text files. Files are broken into > lines. > * Either linefeed or carriage-return are used to signal end of line. Keys > are > * the position in the file, and values are the line of text.. > */ > @InterfaceAudience.Public > @InterfaceStability.Stable > public class MyTextInputFormat extends MyFileInputFormat<LongWritable, > Text> { > private static final Log LOG = > LogFactory.getLog(MyTextInputFormat.class); > @Override > public RecordReader<LongWritable, Text> createRecordReader( > InputSplit split, TaskAttemptContext context) { > String delimiter = context.getConfiguration().get( > "textinputformat.record.delimiter"); > byte[] recordDelimiterBytes = null; > if (null != delimiter) > recordDelimiterBytes = > delimiter.getBytes(Charsets.UTF_8); > return new LineRecordReader(recordDelimiterBytes); > } > @Override > protected boolean isSplitable(JobContext context, Path file) { > final CompressionCodec codec = new CompressionCodecFactory( > context.getConfiguration()).getCodec(file); > if (null == codec) { > return true; > } > LOG.info("i split readddddddddd record"); > return codec instanceof SplittableCompressionCodec; > } > } > package yarndemo; > import java.io.IOException; > import java.util.ArrayList; > import java.util.List; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.flume.annotations.InterfaceAudience; > import org.apache.flume.annotations.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.BlockLocation; > import org.apache.hadoop.fs.FileStatus; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.fs.PathFilter; > import org.apache.hadoop.mapreduce.InputFormat; > import org.apache.hadoop.mapreduce.InputSplit; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.JobContext; > import org.apache.hadoop.mapreduce.Mapper; > import org.apache.hadoop.mapreduce.lib.input.FileSplit; > import org.apache.hadoop.mapreduce.lib.input.InvalidInputException; > import org.apache.hadoop.mapreduce.security.TokenCache; > import org.apache.hadoop.util.ReflectionUtils; > import org.apache.hadoop.util.StringUtils; > /** > * A base class for file-based {@link InputFormat}s. > * > * <p> > * <code>FileInputFormat</code> is the base class for all file-based > * <code>InputFormat</code>s. This provides a generic implementation of > * {@link #getSplits(JobContext)}. Subclasses of > <code>FileInputFormat</code> > * can also override the {@link #isSplitable(JobContext, Path)} method to > ensure > * input-files are not split-up and are processed as a whole by > {@linkMapper}s. > */ > @InterfaceAudience.Public > @InterfaceStability.Stable > public abstract class MyFileInputFormat<K, V> extends InputFormat<K, V> { > public static final String INPUT_DIR = > "mapreduce.input.fileinputformat.inputdir"; > public static final String SPLIT_MAXSIZE = > "mapreduce.input.fileinputformat.split.maxsize"; > public static final String SPLIT_MINSIZE = > "mapreduce.input.fileinputformat.split.minsize"; > public static final String PATHFILTER_CLASS = > "mapreduce.input.pathFilter.class"; > public static final String NUM_INPUT_FILES = > "mapreduce.input.fileinputformat.numinputfiles"; > private static final Log LOG = > LogFactory.getLog(MyFileInputFormat.class); > private static final double SPLIT_SLOP = 1.1; // 10% slop > private static final PathFilter hiddenFileFilter = new PathFilter(){ > public boolean accept(Path p){ > String name = p.getName(); > return !name.startsWith("_") && !name.startsWith("."); > } > }; > /** > * Proxy PathFilter that accepts a path only if all filters given in the > * constructor do. Used by the listPaths() to apply the built-in > * hiddenFileFilter together with a user provided one (if any). > */ > private static class MultiPathFilter implements PathFilter { > private List<PathFilter> filters; > public MultiPathFilter(List<PathFilter> filters) { > this.filters = filters; > } > public boolean accept(Path path) { > for (PathFilter filter : filters) { > if (!filter.accept(path)) { > return false; > } > } > return true; > } > } > /** > * Get the lower bound on split size imposed by the format. > * > * @return the number of bytes of the minimal split for this format > */ > protected long getFormatMinSplitSize() { > return 1; > } > /** > * Is the given filename splitable? Usually, true, but if the file > is stream > * compressed, it will not be. > * > * <code>FileInputFormat</code> implementations can override this > and return > * <code>false</code> to ensure that individual input files are > never > * split-up so that {@link Mapper}s process entire files. > * > * @param context > * the job context > * @param filename > * the file name to check > * @return is this file splitable? > */ > protected boolean isSplitable(JobContext context, Path filename) { > return true; > } > /** > * Set a PathFilter to be applied to the input paths for the > map-reduce job. > * > * @param job > * the job to modify > * @param filter > * the PathFilter class use for filtering the input > paths. > */ > public static void setInputPathFilter(Job job, > Class<? extends PathFilter> > filter) { > job.getConfiguration().setClass(PATHFILTER_CLASS, filter, > PathFilter.class); > } > /** > * Set the minimum input split size > * > * @param job > * the job to modify > * @param size > * the minimum size > */ > public static void setMinInputSplitSize(Job job, long size) { > job.getConfiguration().setLong(SPLIT_MINSIZE, size); > } > /** > * Get the minimum split size > * > * @param job > * the job > * @return the minimum number of bytes that can be in a split > */ > public static long getMinSplitSize(JobContext job) { > return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); > } > /** > * Set the maximum split size > * > * @param job > * the job to modify > * @param size > * the maximum split size > */ > public static void setMaxInputSplitSize(Job job, long size) { > job.getConfiguration().setLong(SPLIT_MAXSIZE, size); > } > /** > * Get the maximum split size. > * > * @param context > * the job to look at. > * @return the maximum number of bytes a split can include > */ > public static long getMaxSplitSize(JobContext context) { > return context.getConfiguration() > .getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); > } > /** > * Get a PathFilter instance of the filter set for the input paths. > * > * @return the PathFilter instance set for the job, NULL if none has > been set. > */ > public static PathFilter getInputPathFilter(JobContext context) { > Configuration conf = context.getConfiguration(); > Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, > null, > PathFilter.class); > return (filterClass != null) ? > (PathFilter) ReflectionUtils > .newInstance(filterClass, conf) : null; > } > /** List input directories. > * Subclasses may override to, e.g., select only files matching a regular > * expression. > * > * @param job the job to list input paths for > * @return array of FileStatus objects > * @throws IOException if zero items. > */ > protected List<FileStatus> listStatus(JobContext job) throws > IOException { > List<FileStatus> result = new ArrayList<FileStatus>(); > Path[] dirs = getInputPaths(job); > LOG.info("jumpppppppppp into liststatus!!"); > if (dirs.length == 0) { > throw new IOException("No input paths specified in job"); > } > // get tokens for all the required FileSystems.. > TokenCache.obtainTokensForNamenodes(job.getCredentials(), > dirs, > job.getConfiguration()); > List<IOException> errors = new ArrayList<IOException>(); > // creates a MultiPathFilter with the hiddenFileFilter and the > // user provided one (if any). > List<PathFilter> filters = new ArrayList<PathFilter>(); > filters.add(hiddenFileFilter); > PathFilter jobFilter = getInputPathFilter(job); > if (jobFilter != null) { > filters.add(jobFilter); > } > PathFilter inputFilter = new MultiPathFilter(filters); > for (int i = 0; i < dirs.length; ++i) { > Path p = dirs[i]; > FileSystem fs = > p.getFileSystem(job.getConfiguration()); > FileStatus[] matches = fs.globStatus(p, inputFilter); > if (matches == null) { > errors.add(new IOException("Input path does not exist: " + p)); > } else if (matches.length == 0) { > errors.add(new IOException("Input Pattern " + p + " matches 0 > files")); > } else { > for (FileStatus globStat: matches) { > if (globStat.isDirectory()) { > for(FileStatus stat: fs.listStatus(globStat.getPath(), > inputFilter)) { > result.add(stat); > } > } else { > result.add(globStat); > } > } > } > } > if (!errors.isEmpty()) { > throw new InvalidInputException(errors); > } > LOG.info("tttttTotal input paths to process : " + result.size()); > return result; > } > /** > * A factory that makes the split for this class. It can be overridden > * by sub-classes to make sub-types > */ > protected FileSplit makeSplit(Path file, long start, long length, > String[] hosts) { > return new FileSplit(file, start, length, hosts); > } > /** > * Generate the list of files and make them into FileSplits. > * > * @param job > * the job context > * @throws IOException > */ > public List<InputSplit> getSplits(JobContext job) throws > IOException { > long minSize = Math.max(getFormatMinSplitSize(), > getMinSplitSize(job)); > long maxSize = getMaxSplitSize(job); > LOG.info("in getsplits"); > // generate splits > List<InputSplit> splits = new ArrayList<InputSplit>(); > List<FileStatus> files = listStatus(job); > for (FileStatus file: files) { > Path path = file.getPath(); > long length = file.getLen(); > if (length != 0) { > FileSystem fs = > path.getFileSystem(job.getConfiguration()); > BlockLocation[] blkLocations = > fs.getFileBlockLocations(file, > 0, length); > if (isSplitable(job, path)) { > long blockSize = > file.getBlockSize(); > long splitSize = > computeSplitSize(blockSize, minSize, > maxSize); > long bytesRemaining = length; > while (((double) bytesRemaining) / > splitSize > SPLIT_SLOP) { > int blkIndex = > getBlockIndex(blkLocations, length > - > bytesRemaining); > splits.add(makeSplit(path, > length - bytesRemaining, > splitSize, > blkLocations[blkIndex].getHosts())); > bytesRemaining -= > splitSize; > } > if (bytesRemaining != 0) { > int blkIndex = > getBlockIndex(blkLocations, length > - > bytesRemaining); > splits.add(makeSplit(path, > length - bytesRemaining, > > bytesRemaining, > > blkLocations[blkIndex].getHosts())); > } > } else { // not splitable > splits.add(makeSplit(path, 0, > length, > > blkLocations[0].getHosts())); > } > } else { > //Create empty hosts array for zero length files > splits.add(makeSplit(path, 0, length, new String[0])); > } > } > // Save the number of input files for metrics/loadgen > job.getConfiguration().setLong(NUM_INPUT_FILES, > files.size()); > LOG.debug("Total # of splits: " + splits.size()); > return splits; > } > protected long computeSplitSize(long blockSize, long minSize, long > maxSize) { > return Math.max(minSize, Math.min(maxSize, blockSize)); > } > protected int getBlockIndex(BlockLocation[] blkLocations, > long offset) { > for (int i = 0 ; i < blkLocations.length; i++) { > // is the offset inside this block? > if ((blkLocations[i].getOffset() <= offset) && > (offset < blkLocations[i].getOffset() + > blkLocations[i].getLength())){ > return i; > } > } > BlockLocation last = blkLocations[blkLocations.length -1]; > long fileLength = last.getOffset() + last.getLength() -1; > throw new IllegalArgumentException("Offset " + offset + > " is outside of file (0.." + > fileLength + ")"); > } > /** > * Sets the given comma separated paths as the list of inputs for > the > * map-reduce job. > * > * @param job > * the job > * @param commaSeparatedPaths > * Comma separated paths to be set as the list of > inputs for the > * map-reduce job. > */ > public static void setInputPaths(Job job, String > commaSeparatedPaths) > throws IOException { > setInputPaths(job, > StringUtils.stringToPath( > getPathStrings(commaSeparatedPaths))); > } > /** > * Add the given comma separated paths to the list of inputs for > the > * map-reduce job. > * > * @param job > * The job to modify > * @param commaSeparatedPaths > * Comma separated paths to be added to the list of > inputs for > * the map-reduce job. > */ > public static void addInputPaths(Job job, String > commaSeparatedPaths) > throws IOException { > for (String str : getPathStrings(commaSeparatedPaths)) { > addInputPath(job, new Path(str)); > } > } > /** > * Set the array of {@link Path}s as the list of inputs for the > map-reduce > * job. > * > * @param job > * The job to modify > * @param inputPaths > * the {@link Path}s of the input directories/files > for the > * map-reduce job. > */ > public static void setInputPaths(Job job, Path... inputPaths) > throws IOException { > Configuration conf = job.getConfiguration(); > Path path = > inputPaths[0].getFileSystem(conf).makeQualified( > inputPaths[0]); > StringBuffer str = new > StringBuffer(StringUtils.escapeString(path.toString())); > for(int i = 1; i < inputPaths.length;i++) { > str.append(StringUtils.COMMA_STR); > path = > inputPaths[i].getFileSystem(conf).makeQualified( > inputPaths[i]); > str.append(StringUtils.escapeString(path.toString())); > } > conf.set(INPUT_DIR, str.toString()); > } > /** > * Add a {@link Path} to the list of inputs for the map-reduce > job. > * > * @param job > * The {@link Job} to modify > * @param path > * {@link Path} to be added to the list of inputs for > the > * map-reduce job. > */ > public static void addInputPath(Job job, Path path) throws > IOException { > Configuration conf = job.getConfiguration(); > path = path.getFileSystem(conf).makeQualified(path); > String dirStr = StringUtils.escapeString(path.toString()); > String dirs = conf.get(INPUT_DIR); > conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + > dirStr); > } > // This method escapes commas in the glob pattern of the given paths. > private static String[] getPathStrings(String commaSeparatedPaths) { > int length = commaSeparatedPaths.length(); > int curlyOpen = 0; > int pathStart = 0; > boolean globPattern = false; > List<String> pathStrings = new ArrayList<String>(); > for (int i=0; i<length; i++) { > char ch = commaSeparatedPaths.charAt(i); > switch(ch) { > case '{' : { > curlyOpen++; > if (!globPattern) { > globPattern = true; > } > break; > } > case '}' : { > curlyOpen--; > if (curlyOpen == 0 && globPattern) { > globPattern = false; > } > break; > } > case ',' : { > if (!globPattern) { > pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); > pathStart = i + 1 ; > } > break; > } > default: > continue; // nothing special to do for this character > } > } > pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); > return pathStrings.toArray(new String[0]); > } > /** > * Get the list of input {@link Path}s for the map-reduce job. > * > * @param context > * The job > * @return the list of input {@link Path}s for the map-reduce job. > */ > public static Path[] getInputPaths(JobContext context) { > String dirs = context.getConfiguration().get(INPUT_DIR, > ""); > String [] list = StringUtils.split(dirs); > Path[] result = new Path[list.length]; > for (int i = 0; i < list.length; i++) { > result[i] = new Path(StringUtils.unEscapeString(list[i])); > } > return result; > } > } > >
