This InputFormat reads a Fasta file (See below)
Format is a line starting >
plus N lines of Data
The projects in
https://code.google.com/p/distributed-tools/
Have other samples of more complex input formats
>YDR356W SPC110 SGDID:S000002764, Chr IV from 1186099-1188933, Verified
ORF, "Inner plaque spindle pole body (SPB) component, ortholog of human
kendrin; involved in connecting nuclear microtubules to SPB; interacts with
Tub4p-complex and calmodulin; phosphorylated by Mps1p in cell
cycle-dependent manner"
MDEASHLPNGSLKNMEFTPVGFIKSKRNTTQTQVVSPTKVPNANNGDENEGPVKKRQRRS
IDDTIDSTRLFSEASQFDDSFPEIKANIPPSPRSGNVDKSRKRNLIDDLKKDVPMSQPLK
EQEVREHQMKKERFDRALESKLLGKRHITYANSDISNKELYINEIKSLKHEIKELRKEKN
DTLNNYDTLEEETDDLKNRLQALEKELDAKNKIVNSRKVDDHSGCIEEREQMERKLAELE
RRLRLDTRKGEHSLNISLPDDDELDRDYYNSHVYTRYHDYEYPLRFNLNRRGPYFERRLS
FKTVALLVLACVRMKRIAFYRRSDDNRLRILRDRIESSSGRISW
>YLR244C MAP1 SGDID:S000004234, Chr XII from 626333-625170, reverse
complement, Verified ORF, "Methionine aminopeptidase, catalyzes the
cotranslational removal of N-terminal methionine from nascent polypeptides;
function is partially redundant with that of Map2p"
MSTATTTVTTSDQASHPTKIYCSGLQCGRETSSQMKCPVCLKQGIVSIFCDTSCYENNYK
AHKALHNAKDGLEGAYDPFPKFKYSGKVKASYPLTPRRYVPEDIPKPDWAANGLPVSEQR
NDRLNNIPIYKKDQIKKIRKACMLGREVLDIAAAHVRPGITTDELDEIVHNETIKRGAYP
SPLNYYNFPKSLCTSVNEVICHGVPDKTVLKEGDIVNLDVSLYYQGYHADLNETYYVGEN
ISKEALNTTETSRECLKLAIKMCKPGTTFQELGDHIEKHATENKCSVVRTYCGHGVGEFF
HCSPNIPHYAKNRTPGVMKPGMVFTIEPMINEGTWKDMTWPDDWTSTTQDGKLSAQFEHT
LLVTEHGVEILTARNKKSPGGPRQRIK
>REV1_YJL076W NET1 SGDID:S000003612, Chr X from 295162-298731, Verified
ORF, "Core subunit of the RENT complex, which is a complex involved in
nucleolar silencing and telophase exit; stimulates transcription by RNA
polymerase I and regulates nucleolar structure"
MYKNPLLQSSEAITPGYGFQIPMTAQLSPPVLVVQLRLNAYQLSADGASQAMNTRSQNFYSPTFSVNASRFRKTFLLFKPDIIEDSLNLLTNTKECKVLFDPDLDCGSNDQLSLIEIDEQLSPYMKVINNVNFVDRLIVKYLSVPASDDLDIENKVSKRSKLVGSSSPIQQQPQVSQPSGNNLRAIKKRPITTTTTTGTPRMSGNTASRALPTSVRSSPPPYIQKEGIDEDEDDSNNSVIRIPPSQPQTPPPLFSRGADIGSSIKKIKSVIDEEVISSRDPDVTASKTKQQRNPTMTSMIPTGSLLRQGTLTVRHAHESVVKNIDQATVAATGGNAFSSSSASASFVLENRKPVPTVPRLMGSTIKIPIPREIESIKL
SSDSVSDSSSNSDSDSSSEDDSSSPAKGDDSSDGSDDSDSESKASIFSKGLAASASKKKKPILSAFGGSKFDKKK
>YJL077W-A YJL077W-A SGDID:S000028661, Chr X from 294716-294802, Dubious
ORF, "Identified by gene-trapping, microarray-based expression analysis,
and genome-wide homology searching"
MPGIAFKGKDMVKAIQFLEIVVPCHCTT
> Some Comment
On Tue, Oct 28, 2014 at 2:08 PM, John Dison <[email protected]> wrote:
> Hello!
>
> I have a file in the following format:
> +++++ InvoiceNo=1
> some
> text1
> +++++ InvoiceNo=2
> some
> more
> text2
> <...>
>
> Each record starts with a line beginning with five "+", then number of
> invoice.
> Then several lines of text.
> I want the invoice number to become a key for Map operation, and the text
> to become a value.
>
> As far as I understand, I need to implement some kind of custom
> RecordReader class to parse that format. But all examples I found on the
> Internet deal with formats where there is some mark at the end of the
> record, but in my case I only can see that records ended after reading the
> first line of the next record.
>
> I would be very thankful for any help with implementing such a
> RecordReader.
>
> Thanks in advance,
> John.
>
--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com
package com.lordjoe.distributed.input;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.*;
import java.io.*;
import java.util.*;
/**
* com.lordjoe.distributed.input.FastaInputFormat
* User: Steve
* Date: 9/24/2014
*/
public class FastaInputFormat extends FileInputFormat<String, String> implements Serializable {
public static final boolean FORCE_ONE_MAPPER = false;
// todo run off a parameter
// setting this small forces many mappers
@SuppressWarnings("UnusedDeclaration")
public static final int SPLIT_BLOCK_SIZE = 10 * 1024 * 1024;
public static final int MIN_BLOCK_SIZE = 10 * 1024;
private static final double SPLIT_SLOP = 1.1; // 10% slop
private String m_Extension = "fasta";
public FastaInputFormat() {
}
public String getExtension() {
return m_Extension;
}
@SuppressWarnings("UnusedDeclaration")
public void setExtension(final String pExtension) {
m_Extension = pExtension;
}
public boolean isSplitReadable(InputSplit split) {
if (!(split instanceof FileSplit))
return true;
FileSplit fsplit = (FileSplit) split;
Path path1 = fsplit.getPath();
return isPathAcceptable(path1);
}
protected boolean isPathAcceptable(final Path pPath1) {
String path = pPath1.toString().toLowerCase();
if (path.startsWith("part-r-"))
return true;
String extension = getExtension();
if (extension != null && path.endsWith(extension.toLowerCase()))
return true;
if (extension != null && path.endsWith(extension.toLowerCase() + ".gz"))
return true;
//noinspection RedundantIfStatement
if (extension == null)
return true;
return false;
}
@Override
public RecordReader<String, String> createRecordReader(InputSplit split,
TaskAttemptContext context) {
if (isSplitReadable(split))
return new FastaFileReader();
else
return NullRecordReader.INSTANCE; // do not read
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
String fname = file.getName().toLowerCase();
if (fname.endsWith(".gz"))
return false;
//noinspection RedundantIfStatement
if (FORCE_ONE_MAPPER)
return false;
return true;
}
/**
* Generate the list of files and make them into FileSplits.
* This needs to be copied to insert a filter on acceptable data
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
long desiredMappers = job.getConfiguration().getLong("org.systemsbiology.jxtandem.DesiredDatabaseMappers", 0);
// maxSize = SPLIT_BLOCK_SIZE; // force more mappers
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
Path[] paths = getInputPaths(job);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < paths.length; i++) {
Path path = paths[i];
System.err.println("Input path " + path.toString());
}
List<FileStatus> fileStatuses = listStatus(job);
// if there is only one file we may force more than the default mappers
boolean forceNumberMappers = fileStatuses.size() == 1;
for (FileStatus file : fileStatuses) {
Path path = file.getPath();
if (!isPathAcceptable(path)) // filter acceptable data
continue;
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// use desired mappers to force more splits
if (forceNumberMappers && desiredMappers > 0) {
final long ms1 = length / desiredMappers;
final long ms2 = Math.max(MIN_BLOCK_SIZE, ms1);
maxSize = Math.min(maxSize, ms2);
}
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length - 1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
// LOG.debug("Total # of splits: " + splits.size());
return splits;
}
/**
*
* Value is the fasta record minus the comment line
* Key is the comment line
*/
public class FastaFileReader extends RecordReader<String, String> {
private CompressionCodecFactory compressionCodecs = null;
private long m_Start; // start this split
private long m_End; // end this split
private long m_Current; // current position
private String m_Key;
private String m_Value;
private final Text m_Line = new Text(); // use to read current line
private int m_MaxLineLength;
private StringBuilder m_Data = new StringBuilder();
private String m_CurrentLine;
private FSDataInputStream m_FileIn; // input stream needed for position
private LineReader m_Input; // current reader
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
m_MaxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
Text buffer = new Text();
m_Data.setLength(0);
m_Start = split.getStart();
m_End = m_Start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
boolean skipFirstLine = false;
// open the file and seek to the m_Start of the split
FileSystem fs = file.getFileSystem(job);
m_FileIn = fs.open(split.getPath());
if (codec != null) {
m_Input = new LineReader(codec.createInputStream(m_FileIn), job);
m_End = Long.MAX_VALUE;
} else {
if (m_Start != 0) {
skipFirstLine = true;
--m_Start;
m_FileIn.seek(m_Start);
}
m_Input = new LineReader(m_FileIn, job);
}
// not at the beginning so go to first line
if (skipFirstLine) { // skip first line and re-establish "start".
m_Start += m_Input.readLine(buffer, 0,
(int) Math.min((long) Integer.MAX_VALUE, m_End - m_Start));
}
m_Current = m_Start;
m_Key = split.getPath().getName();
}
/**
* look for a line starting with > and read until it closes
*
* @return true if there is data
* @throws java.io.IOException
*/
public boolean nextKeyValue() throws IOException {
if (m_Current > m_End) { // we are the the end of the split
m_Key = null;
m_Value = null;
return false;
}
// read more data
if (m_CurrentLine == null) {
m_CurrentLine = readNextLine();
if (m_CurrentLine == null) { // end of file
m_Key = null;
m_Value = null;
return false;
}
}
// lines starting with > are a new field in FASTA files
while (m_FileIn.getPos() < m_End && !m_CurrentLine.startsWith(">")) {
m_CurrentLine = readNextLine();
}
if (m_CurrentLine == null || !m_CurrentLine.startsWith(">")) { // we are the the end of data
m_Key = null;
m_Value = null;
return false;
}
// label = key - drop the >
m_Key = m_CurrentLine.substring(1);
m_Data.setLength(0); // clear the buffer
m_CurrentLine = readNextLine();
// keep reading
while (m_CurrentLine != null && !m_CurrentLine.startsWith(">")) {
m_Data.append(m_CurrentLine);
m_CurrentLine = readNextLine();
}
if (m_Data.length() == 0) { // cannot read
m_Key = null;
m_Value = null;
return false;
}
m_Value = m_Data.toString();
m_Data.setLength(0); // clear the buffer
m_Current = m_FileIn.getPos();
return true;
}
protected String readNextLine() throws IOException {
int newSize = m_Input.readLine(m_Line, m_MaxLineLength,
Math.max( Math.min(Integer.MAX_VALUE, (int) (m_End - m_Current)),
m_MaxLineLength));
if (newSize == 0)
return null;
m_Current += newSize; // new position
return m_Line.toString();
}
@Override
public String getCurrentKey() {
return m_Key;
}
@Override
public String getCurrentValue() {
return m_Value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
long totalBytes = m_End - m_Start;
long totalhandled = m_Current - m_Start;
return ((float) totalhandled) / totalBytes;
}
public synchronized void close() throws IOException {
if (m_Input != null) {
m_Input.close();
m_FileIn = null;
}
}
}
}