You can find an example/inspiration in the *initialize(InputSplit genericSplit, TaskAttemptContext context)* method in the *org.apache.hadoop.mapreduce.lib.input.LineRecordReader*
2013/11/23 Azuryy Yu <[email protected]> > There is problem in the 'initialize', generally, we cannot think > split.start as the real start, because FileSplit cannot split on the end of > the line accurately, so you need to adjust the start in the 'initialize' > to the start of one line if start is not equal to '0'. > > also, end = start + split.length, this is not a real end, because it maybe > not locate the end of the line. > > so the Reader MUST adjust the real start and the end in the 'initialize'. > otherwise, maybe miss some records. > > Sure, > > our FileInputFormat implementation: > > public class CVSInputFormat extends > > FileInputFormat<FileValidatorDescriptor, Text> { > > > /* > > * (non-Javadoc) > > * > > * @see > > * > org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache > > * .hadoop.mapreduce.InputSplit, > > * org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public RecordReader<FileValidatorDescriptor, Text> createRecordReader( > > InputSplit split, TaskAttemptContext context) { > > String delimiter = context.getConfiguration().get( > > "textinputformat.record.delimiter"); > > byte[] recordDelimiterBytes = null; > > if (null != delimiter) > > recordDelimiterBytes = delimiter.getBytes(); > > return new CVSLineRecordReader(recordDelimiterBytes); > > } > > > /* > > * (non-Javadoc) > > * > > * @see > > * > org.apache.hadoop.mapreduce.lib.input.FileInputFormat#isSplitable(org > > * .apache.hadoop.mapreduce.JobContext, org.apache.hadoop.fs.Path) > > */ > > @Override > > protected boolean isSplitable(JobContext context, Path file) { > > CompressionCodec codec = new CompressionCodecFactory( > > context.getConfiguration()).getCodec(file); > > return codec == null; > > } > > } > > > the recordReader: > > > > public class CVSLineRecordReader extends > > RecordReader<FileValidatorDescriptor, Text> { > > private static final Log LOG = > LogFactory.getLog(CVSLineRecordReader.class); > > > public static final String CVS_FIRST_LINE = "file.first.line"; > > > private long start; > > private long pos; > > private long end; > > private LineReader in; > > private int maxLineLength; > > private FileValidatorDescriptor key = null; > > private Text value = null; > > private Text data = null; > > private byte[] recordDelimiterBytes; > > > public CVSLineRecordReader(byte[] recordDelimiter) { > > this.recordDelimiterBytes = recordDelimiter; > > } > > > @Override > > public void initialize(InputSplit genericSplit, TaskAttemptContext > context) > > throws IOException { > > Properties properties = new Properties(); > > Configuration configuration = context.getConfiguration(); > > > Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context > > .getConfiguration()); > > for (Path cacheFile : cacheFiles) { > > if (cacheFile.toString().endsWith( > > context.getConfiguration().get(VALIDATOR_CONF_PATH))) { > > properties.load(new FileReader(cacheFile.toString())); > > } > > } > > > FileSplit split = (FileSplit) genericSplit; > > Configuration job = context.getConfiguration(); > > this.maxLineLength = > job.getInt("mapred.linerecordreader.maxlength", > > Integer.MAX_VALUE); > > start = split.getStart(); > > end = start + split.getLength(); > > pos = start; > > final Path file = split.getPath(); > > > // open the file and seek to the start of the split > > FileSystem fs = file.getFileSystem(job); > > FSDataInputStream fileIn = fs.open(split.getPath()); > > > this.in = generateReader(fileIn, job); > > > // if CVS_FIRST_LINE does not exist in conf then the csv file > first line > > // is the header > > if (properties.containsKey(CVS_FIRST_LINE)) { > > configuration.set(CVS_FIRST_LINE, > properties.get(CVS_FIRST_LINE) > > .toString()); > > } else { > > readData(); > > configuration.set(CVS_FIRST_LINE, data.toString()); > > if (start != 0) { > > fileIn.seek(start); > > in = generateReader(fileIn, job); > > pos = start; > > } > > } > > > key = new FileValidatorDescriptor(); > > key.setFileName(split.getPath().getName()); > > context.getConfiguration().set("file.name", key.getFileName()); > > > } > > > @Override > > public boolean nextKeyValue() throws IOException { > > int newSize = readData(); > > if (newSize == 0) { > > key = null; > > value = null; > > return false; > > } else { > > key.setOffset(this.pos); > > value = data; > > return true; > > } > > } > > > private LineReader generateReader(FSDataInputStream fileIn, > > Configuration job) throws IOException { > > if (null == this.recordDelimiterBytes) { > > return new LineReader(fileIn, job); > > } else { > > return new LineReader(fileIn, job, this.recordDelimiterBytes); > > } > > > } > > > private int readData() throws IOException { > > if (data == null) { > > data = new Text(); > > } > > int newSize = 0; > > while (pos < end) { > > newSize = in.readLine(data, maxLineLength, > > Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), > > maxLineLength)); > > if (newSize == 0) { > > break; > > } > > pos += newSize; > > if (newSize < maxLineLength) { > > break; > > } > > > // line too long. try again > > LOG.info("Skipped line of size " + newSize + " at pos " > > + (pos - newSize)); > > } > > return newSize; > > } > > > @Override > > public FileValidatorDescriptor getCurrentKey() { > > return key; > > } > > > @Override > > public Text getCurrentValue() { > > return value; > > } > > > @Override > > public float getProgress() { > > if (start == end) { > > return 0.0f; > > } else { > > return Math.min(1.0f, (pos - start) / (float) (end - start)); > > } > > } > > > @Override > > public synchronized void close() throws IOException { > > if (in != null) { > > in.close(); > > } > > } > > } > > Thanks. > > > De: Azuryy Yu <[email protected]> > Responder a: "[email protected]" <[email protected]> > Fecha: viernes, 22 de noviembre de 2013 12:19 > Para: "[email protected]" <[email protected]> > Asunto: Re: Missing records from HDFS > > I do think this is because of your RecorderReader, can you paste your > code here? and give a piece of data example. > > please use pastebin if you want. > > > On Fri, Nov 22, 2013 at 7:16 PM, ZORAIDA HIDALGO SANCHEZ > <[email protected]>wrote: > >> One more thing, >> >> if we split the files then all the records are processed. Files are >> of 70,5MB. >> >> Thanks, >> >> Zoraida.- >> >> De: zoraida <[email protected]> >> Fecha: viernes, 22 de noviembre de 2013 08:59 >> >> Para: "[email protected]" <[email protected]> >> Asunto: Re: Missing records from HDFS >> >> Thanks for your response Azuryy. >> >> My hadoop version: 2.0.0-cdh4.3.0 >> InputFormat: a custom class that extends from FileInputFormat(csv input >> format) >> These fiels are under the same directory, different files. >> My input path is configured using oozie throughout the propertie >> mapred.input.dir. >> >> >> Same code and input running on Hadoop 2.0.0-cdh4.2.1 works fine. Does >> not discard any record. >> >> Thanks. >> >> De: Azuryy Yu <[email protected]> >> Responder a: "[email protected]" <[email protected]> >> Fecha: jueves, 21 de noviembre de 2013 07:31 >> Para: "[email protected]" <[email protected]> >> Asunto: Re: Missing records from HDFS >> >> what's your hadoop version? and which InputFormat are you used? >> >> these files under one directory or there are lots of subdirectory? how >> ddi you configure input path in your main? >> >> >> >> On Thu, Nov 21, 2013 at 12:25 AM, ZORAIDA HIDALGO SANCHEZ <[email protected] >> > wrote: >> >>> Hi all, >>> >>> my job is not reading all the input records. In the input directory I >>> have a set of files containing a total of 6000000 records but only >>> 5997000 are processed. The Map Input Records counter says 5997000. >>> I have tried downloading the files with a getmerge to check how many >>> records would return but the correct number is returned(6000000). >>> >>> Do you have any suggestion? >>> >>> Thanks. >>> >>> ------------------------------ >>> >>> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar >>> nuestra política de envío y recepción de correo electrónico en el enlace >>> situado más abajo. >>> This message is intended exclusively for its addressee. We only send and >>> receive email on the basis of the terms set out at: >>> http://www.tid.es/ES/PAGINAS/disclaimer.aspx >>> >> >> >> ------------------------------ >> >> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar >> nuestra política de envío y recepción de correo electrónico en el enlace >> situado más abajo. >> This message is intended exclusively for its addressee. We only send and >> receive email on the basis of the terms set out at: >> http://www.tid.es/ES/PAGINAS/disclaimer.aspx >> > > > ------------------------------ > > Este mensaje se dirige exclusivamente a su destinatario. Puede consultar > nuestra política de envío y recepción de correo electrónico en el enlace > situado más abajo. > This message is intended exclusively for its addressee. We only send and > receive email on the basis of the terms set out at: > http://www.tid.es/ES/PAGINAS/disclaimer.aspx >
