Look at this sample ============================================= package org.systemsbiology.hadoop;
import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import java.io.*; import java.util.*; /** * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat * Splitter that reads scan tags from an XML file * No assumption is made about lines but tage and end tags MUST look like <MyTag </MyTag> with no embedded spaces * usually you will subclass and hard code the tag you want to split on */ public class XMLTagInputFormat extends FileInputFormat<Text, Text> { public static final XMLTagInputFormat[] EMPTY_ARRAY = {}; private static final double SPLIT_SLOP = 1.1; // 10% slop public static final int BUFFER_SIZE = 4096; private final String m_BaseTag; private final String m_StartTag; private final String m_EndTag; private String m_Extension; public XMLTagInputFormat(final String pBaseTag) { m_BaseTag = pBaseTag; m_StartTag = "<" + pBaseTag; m_EndTag = "</" + pBaseTag + ">"; } public String getExtension() { return m_Extension; } public void setExtension(final String pExtension) { m_Extension = pExtension; } public boolean isSplitReadable(InputSplit split) { if (!(split instanceof FileSplit)) return true; FileSplit fsplit = (FileSplit) split; Path path1 = fsplit.getPath(); return isPathAcceptable(path1); } protected boolean isPathAcceptable(final Path pPath1) { String path = pPath1.toString().toLowerCase(); if(path.startsWith("part-r-")) return true; String extension = getExtension(); if (extension != null && path.endsWith(extension.toLowerCase())) return true; if (extension != null && path.endsWith(extension.toLowerCase() + ".gz")) return true; if (extension == null ) return true; return false; } public String getStartTag() { return m_StartTag; } public String getBaseTag() { return m_BaseTag; } public String getEndTag() { return m_EndTag; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { if (isSplitReadable(split)) return new MyXMLFileReader(); else return NullRecordReader.INSTANCE; // do not read } @Override protected boolean isSplitable(JobContext context, Path file) { String fname = file.getName().toLowerCase(); if(fname.endsWith(".gz")) return false; return true; } /** * Generate the list of files and make them into FileSplits. * This needs to be copied to insert a filter on acceptable data */ @Override public List<InputSplit> getSplits(JobContext job ) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); for (FileStatus file : listStatus(job)) { Path path = file.getPath(); if (!isPathAcceptable(path)) // filter acceptable data continue; FileSystem fs = path.getFileSystem(job.getConfiguration()); long length = file.getLen(); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(new FileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkLocations.length - 1].getHosts())); } } else if (length != 0) { splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(path, 0, length, new String[0])); } } // LOG.debug("Total # of splits: " + splits.size()); return splits; } /** * Custom RecordReader which returns the entire file as a * single m_Value with the name as a m_Key * Value is the entire file * Key is the file name */ public class MyXMLFileReader extends RecordReader<Text, Text> { private CompressionCodecFactory compressionCodecs = null; private long m_Start; private long m_End; private long m_Current; private BufferedReader m_Input; private Text m_Key; private Text m_Value = null; private char[] m_Buffer = new char[BUFFER_SIZE]; StringBuilder m_Sb = new StringBuilder(); public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); m_Sb.setLength(0); m_Start = split.getStart(); m_End = m_Start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the m_Start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); if (codec != null) { CompressionInputStream inputStream = codec.createInputStream(fileIn); m_Input = new BufferedReader(new InputStreamReader(inputStream)); m_End = Long.MAX_VALUE; } else { m_Input = new BufferedReader(new InputStreamReader(fileIn)); } m_Current = m_Start; if (m_Key == null) { m_Key = new Text(); } m_Key.set(split.getPath().getName()); if (m_Value == null) { m_Value = new Text(); } } /** * look for a <scan tag then read until it closes * * @return true if there is data * @throws java.io.IOException */ public boolean nextKeyValue() throws IOException { if(readFromCurrentBuffer()) return true; int newSize = 0; String startTag = getStartTag() + " "; String startTag2 = getStartTag() + ">"; newSize = m_Input.read(m_Buffer); while (newSize > 0) { m_Current += newSize; m_Sb.append(m_Buffer, 0, newSize); if( readFromCurrentBuffer()) return true; newSize = m_Input.read(m_Buffer); } // exit because we are at the m_End if (newSize <= 0) { m_Key = null; m_Value = null; return false; } return true; } protected boolean readFromCurrentBuffer() { String endTag = getEndTag(); String startText = m_Sb.toString(); if(!startText.contains(endTag)) return false; // need more read String startTag = getStartTag() + " "; String startTag2 = getStartTag() + ">"; int index = startText.indexOf(startTag); if (index == -1) index = startText.indexOf(startTag2); if(index == -1) return false; startText = startText.substring(index); m_Sb.setLength(0); m_Sb.append(startText); String s = startText; index = s.indexOf(endTag); if (index == -1) return false; // need more read // throw new IllegalStateException("unmatched tag " + getBaseTag()); index += endTag.length(); String tag = s.substring(0, index).trim(); m_Value.set(tag); // keep the remaining text to add to the next tag m_Sb.setLength(0); String rest = s.substring(index); m_Sb.append(rest); return true; } @Override public Text getCurrentKey() { return m_Key; } @Override public Text getCurrentValue() { return m_Value; } /** * Get the progress within the split */ public float getProgress() { return ((float) m_Current - m_Start) / (m_Start - m_End); } public synchronized void close() throws IOException { if (m_Input != null) { m_Input.close(); } } } } ============================================= On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shi...@gmail.com> wrote: > Hello everyone, > > I'm new to Hadoop and I'm trying to figure out how to design a M/R program > to parse a file and generate a PMML file as output. > > What I would like to do is split a file by a keyword instead a given number > of lines because the location of the split could change from time to time. > > I'm looking around and was thinking maybe KeyValueTextInputFormat would be > the way to go but I'm not finding any clear examples how to use it. So I'm > not sure if this is the right choice or not. > > Here is a basic input example of what I'm working with. > > [Input file info] > more info > more info > etc. > etc. > *Keyword* > different info > different info > *Keyword* > some more info > > For the example above, each section can be generated separately from each > other. However, within each section, different lines are dependent upon each > other to generate a valid PMML file. > > Can anyone offer a suggestion what type of input format I should use? > > Thanks for your time > Erik > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com