The reason for the two id that it may say <Foo> .... or <Foo attr1="... - now I suppose you could just look for <Foo which would cover either case
Also note I am cheating a bit and this will not handle properly tags which are commented out with the xml comment <!-- but I doubt it is possible to handle these without parsing the entire (potentially large file) On Mon, Jul 18, 2011 at 9:40 AM, Erik T <erik.shi...@gmail.com> wrote: > Hi Steven, > > Thank you for the sample. I have one question though. > > In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed? > Erik > > > > On 11 July 2011 15:11, Steve Lewis <lordjoe2...@gmail.com> wrote: > >> Look at this sample >> ============================================= >> package org.systemsbiology.hadoop; >> >> >> >> import org.apache.hadoop.conf.*; >> import org.apache.hadoop.fs.*; >> import org.apache.hadoop.fs.FileSystem; >> import org.apache.hadoop.io.*; >> import org.apache.hadoop.io.compress.*; >> import org.apache.hadoop.mapreduce.*; >> import org.apache.hadoop.mapreduce.lib.input.*; >> >> import java.io.*; >> import java.util.*; >> >> /** >> * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat >> * Splitter that reads scan tags from an XML file >> * No assumption is made about lines but tage and end tags MUST look like >> <MyTag </MyTag> with no embedded spaces >> * usually you will subclass and hard code the tag you want to split on >> */ >> public class XMLTagInputFormat extends FileInputFormat<Text, Text> { >> public static final XMLTagInputFormat[] EMPTY_ARRAY = {}; >> >> >> private static final double SPLIT_SLOP = 1.1; // 10% slop >> >> >> public static final int BUFFER_SIZE = 4096; >> >> private final String m_BaseTag; >> private final String m_StartTag; >> private final String m_EndTag; >> private String m_Extension; >> >> public XMLTagInputFormat(final String pBaseTag) { >> m_BaseTag = pBaseTag; >> m_StartTag = "<" + pBaseTag; >> m_EndTag = "</" + pBaseTag + ">"; >> >> } >> >> public String getExtension() { >> return m_Extension; >> } >> >> public void setExtension(final String pExtension) { >> m_Extension = pExtension; >> } >> >> public boolean isSplitReadable(InputSplit split) { >> if (!(split instanceof FileSplit)) >> return true; >> FileSplit fsplit = (FileSplit) split; >> Path path1 = fsplit.getPath(); >> return isPathAcceptable(path1); >> } >> >> protected boolean isPathAcceptable(final Path pPath1) { >> String path = pPath1.toString().toLowerCase(); >> if(path.startsWith("part-r-")) >> return true; >> String extension = getExtension(); >> if (extension != null && path.endsWith(extension.toLowerCase())) >> return true; >> if (extension != null && path.endsWith(extension.toLowerCase() + >> ".gz")) >> return true; >> if (extension == null ) >> return true; >> return false; >> } >> >> public String getStartTag() { >> return m_StartTag; >> } >> >> public String getBaseTag() { >> return m_BaseTag; >> } >> >> public String getEndTag() { >> return m_EndTag; >> } >> >> @Override >> public RecordReader<Text, Text> createRecordReader(InputSplit split, >> TaskAttemptContext >> context) { >> if (isSplitReadable(split)) >> return new MyXMLFileReader(); >> else >> return NullRecordReader.INSTANCE; // do not read >> } >> >> @Override >> protected boolean isSplitable(JobContext context, Path file) { >> String fname = file.getName().toLowerCase(); >> if(fname.endsWith(".gz")) >> return false; >> return true; >> } >> >> /** >> * Generate the list of files and make them into FileSplits. >> * This needs to be copied to insert a filter on acceptable data >> */ >> @Override >> public List<InputSplit> getSplits(JobContext job >> ) throws IOException { >> long minSize = Math.max(getFormatMinSplitSize(), >> getMinSplitSize(job)); >> long maxSize = getMaxSplitSize(job); >> >> // generate splits >> List<InputSplit> splits = new ArrayList<InputSplit>(); >> for (FileStatus file : listStatus(job)) { >> Path path = file.getPath(); >> if (!isPathAcceptable(path)) // filter acceptable data >> continue; >> FileSystem fs = path.getFileSystem(job.getConfiguration()); >> long length = file.getLen(); >> BlockLocation[] blkLocations = fs.getFileBlockLocations(file, >> 0, length); >> if ((length != 0) && isSplitable(job, path)) { >> long blockSize = file.getBlockSize(); >> long splitSize = computeSplitSize(blockSize, minSize, >> maxSize); >> >> long bytesRemaining = length; >> while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) >> { >> int blkIndex = getBlockIndex(blkLocations, length - >> bytesRemaining); >> splits.add(new FileSplit(path, length - >> bytesRemaining, splitSize, >> blkLocations[blkIndex].getHosts())); >> bytesRemaining -= splitSize; >> } >> >> if (bytesRemaining != 0) { >> splits.add(new FileSplit(path, length - >> bytesRemaining, bytesRemaining, >> blkLocations[blkLocations.length - >> 1].getHosts())); >> } >> } >> else if (length != 0) { >> splits.add(new FileSplit(path, 0, length, >> blkLocations[0].getHosts())); >> } >> else { >> //Create empty hosts array for zero length files >> splits.add(new FileSplit(path, 0, length, new String[0])); >> } >> } >> // LOG.debug("Total # of splits: " + splits.size()); >> return splits; >> } >> >> /** >> * Custom RecordReader which returns the entire file as a >> * single m_Value with the name as a m_Key >> * Value is the entire file >> * Key is the file name >> */ >> public class MyXMLFileReader extends RecordReader<Text, Text> { >> >> private CompressionCodecFactory compressionCodecs = null; >> private long m_Start; >> private long m_End; >> private long m_Current; >> private BufferedReader m_Input; >> private Text m_Key; >> private Text m_Value = null; >> private char[] m_Buffer = new char[BUFFER_SIZE]; >> StringBuilder m_Sb = new StringBuilder(); >> >> public void initialize(InputSplit genericSplit, >> TaskAttemptContext context) throws >> IOException { >> FileSplit split = (FileSplit) genericSplit; >> Configuration job = context.getConfiguration(); >> m_Sb.setLength(0); >> m_Start = split.getStart(); >> m_End = m_Start + split.getLength(); >> final Path file = split.getPath(); >> compressionCodecs = new CompressionCodecFactory(job); >> final CompressionCodec codec = >> compressionCodecs.getCodec(file); >> >> // open the file and seek to the m_Start of the split >> FileSystem fs = file.getFileSystem(job); >> FSDataInputStream fileIn = fs.open(split.getPath()); >> if (codec != null) { >> CompressionInputStream inputStream = >> codec.createInputStream(fileIn); >> m_Input = new BufferedReader(new >> InputStreamReader(inputStream)); >> m_End = Long.MAX_VALUE; >> } >> else { >> m_Input = new BufferedReader(new >> InputStreamReader(fileIn)); >> } >> m_Current = m_Start; >> if (m_Key == null) { >> m_Key = new Text(); >> } >> m_Key.set(split.getPath().getName()); >> if (m_Value == null) { >> m_Value = new Text(); >> } >> >> } >> >> /** >> * look for a <scan tag then read until it closes >> * >> * @return true if there is data >> * @throws java.io.IOException >> */ >> public boolean nextKeyValue() throws IOException { >> if(readFromCurrentBuffer()) >> return true; >> int newSize = 0; >> String startTag = getStartTag() + " "; >> String startTag2 = getStartTag() + ">"; >> newSize = m_Input.read(m_Buffer); >> >> while (newSize > 0) { >> m_Current += newSize; >> m_Sb.append(m_Buffer, 0, newSize); >> if( readFromCurrentBuffer()) >> return true; >> newSize = m_Input.read(m_Buffer); >> } >> // exit because we are at the m_End >> if (newSize <= 0) { >> m_Key = null; >> m_Value = null; >> return false; >> } >> >> return true; >> } >> >> protected boolean readFromCurrentBuffer() >> { >> String endTag = getEndTag(); >> String startText = m_Sb.toString(); >> if(!startText.contains(endTag)) >> return false; // need more read >> String startTag = getStartTag() + " "; >> String startTag2 = getStartTag() + ">"; >> int index = startText.indexOf(startTag); >> if (index == -1) >> index = startText.indexOf(startTag2); >> if(index == -1) >> return false; >> startText = startText.substring(index); >> m_Sb.setLength(0); >> m_Sb.append(startText); >> >> String s = startText; >> index = s.indexOf(endTag); >> if (index == -1) >> return false; // need more read >> // throw new IllegalStateException("unmatched tag " + >> getBaseTag()); >> index += endTag.length(); >> String tag = s.substring(0, index).trim(); >> m_Value.set(tag); >> >> // keep the remaining text to add to the next tag >> m_Sb.setLength(0); >> String rest = s.substring(index); >> m_Sb.append(rest); >> return true; >> } >> >> @Override >> public Text getCurrentKey() { >> return m_Key; >> } >> >> @Override >> public Text getCurrentValue() { >> return m_Value; >> } >> >> /** >> * Get the progress within the split >> */ >> public float getProgress() { >> return ((float) m_Current - m_Start) / (m_Start - m_End); >> } >> >> public synchronized void close() throws IOException { >> if (m_Input != null) { >> m_Input.close(); >> } >> } >> } >> } >> >> ============================================= >> >> >> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shi...@gmail.com> wrote: >> >>> Hello everyone, >>> >>> I'm new to Hadoop and I'm trying to figure out how to design a M/R >>> program to parse a file and generate a PMML file as output. >>> >>> What I would like to do is split a file by a keyword instead a given >>> number of lines because the location of the split could change from time to >>> time. >>> >>> I'm looking around and was thinking maybe KeyValueTextInputFormat would >>> be the way to go but I'm not finding any clear examples how to use it. So >>> I'm not sure if this is the right choice or not. >>> >>> Here is a basic input example of what I'm working with. >>> >>> [Input file info] >>> more info >>> more info >>> etc. >>> etc. >>> *Keyword* >>> different info >>> different info >>> *Keyword* >>> some more info >>> >>> For the example above, each section can be generated separately from each >>> other. However, within each section, different lines are dependent upon each >>> other to generate a valid PMML file. >>> >>> Can anyone offer a suggestion what type of input format I should use? >>> >>> Thanks for your time >>> Erik >>> >> >> >> >> -- >> Steven M. Lewis PhD >> 4221 105th Ave NE >> Kirkland, WA 98033 >> 206-384-1340 (cell) >> Skype lordjoe_com >> >> >> > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com