I understand that part but I don't see startTag or startTag2 used in the nextKeyValue method after they have been declared. Erik
On 18 July 2011 14:20, Steve Lewis <lordjoe2...@gmail.com> wrote: > The reason for the two id that it may say > <Foo> .... > or > <Foo attr1="... > - now I suppose you could just look for <Foo which would cover either case > > Also note I am cheating a bit and this will not handle properly tags which > are commented out with > the xml comment <!-- but I doubt it is possible to handle these without > parsing the entire (potentially large file) > > > On Mon, Jul 18, 2011 at 9:40 AM, Erik T <erik.shi...@gmail.com> wrote: > >> Hi Steven, >> >> Thank you for the sample. I have one question though. >> >> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed? >> Erik >> >> >> >> On 11 July 2011 15:11, Steve Lewis <lordjoe2...@gmail.com> wrote: >> >>> Look at this sample >>> ============================================= >>> package org.systemsbiology.hadoop; >>> >>> >>> >>> import org.apache.hadoop.conf.*; >>> import org.apache.hadoop.fs.*; >>> import org.apache.hadoop.fs.FileSystem; >>> import org.apache.hadoop.io.*; >>> import org.apache.hadoop.io.compress.*; >>> import org.apache.hadoop.mapreduce.*; >>> import org.apache.hadoop.mapreduce.lib.input.*; >>> >>> import java.io.*; >>> import java.util.*; >>> >>> /** >>> * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat >>> * Splitter that reads scan tags from an XML file >>> * No assumption is made about lines but tage and end tags MUST look like >>> <MyTag </MyTag> with no embedded spaces >>> * usually you will subclass and hard code the tag you want to split on >>> */ >>> public class XMLTagInputFormat extends FileInputFormat<Text, Text> { >>> public static final XMLTagInputFormat[] EMPTY_ARRAY = {}; >>> >>> >>> private static final double SPLIT_SLOP = 1.1; // 10% slop >>> >>> >>> public static final int BUFFER_SIZE = 4096; >>> >>> private final String m_BaseTag; >>> private final String m_StartTag; >>> private final String m_EndTag; >>> private String m_Extension; >>> >>> public XMLTagInputFormat(final String pBaseTag) { >>> m_BaseTag = pBaseTag; >>> m_StartTag = "<" + pBaseTag; >>> m_EndTag = "</" + pBaseTag + ">"; >>> >>> } >>> >>> public String getExtension() { >>> return m_Extension; >>> } >>> >>> public void setExtension(final String pExtension) { >>> m_Extension = pExtension; >>> } >>> >>> public boolean isSplitReadable(InputSplit split) { >>> if (!(split instanceof FileSplit)) >>> return true; >>> FileSplit fsplit = (FileSplit) split; >>> Path path1 = fsplit.getPath(); >>> return isPathAcceptable(path1); >>> } >>> >>> protected boolean isPathAcceptable(final Path pPath1) { >>> String path = pPath1.toString().toLowerCase(); >>> if(path.startsWith("part-r-")) >>> return true; >>> String extension = getExtension(); >>> if (extension != null && path.endsWith(extension.toLowerCase())) >>> return true; >>> if (extension != null && path.endsWith(extension.toLowerCase() + >>> ".gz")) >>> return true; >>> if (extension == null ) >>> return true; >>> return false; >>> } >>> >>> public String getStartTag() { >>> return m_StartTag; >>> } >>> >>> public String getBaseTag() { >>> return m_BaseTag; >>> } >>> >>> public String getEndTag() { >>> return m_EndTag; >>> } >>> >>> @Override >>> public RecordReader<Text, Text> createRecordReader(InputSplit split, >>> TaskAttemptContext >>> context) { >>> if (isSplitReadable(split)) >>> return new MyXMLFileReader(); >>> else >>> return NullRecordReader.INSTANCE; // do not read >>> } >>> >>> @Override >>> protected boolean isSplitable(JobContext context, Path file) { >>> String fname = file.getName().toLowerCase(); >>> if(fname.endsWith(".gz")) >>> return false; >>> return true; >>> } >>> >>> /** >>> * Generate the list of files and make them into FileSplits. >>> * This needs to be copied to insert a filter on acceptable data >>> */ >>> @Override >>> public List<InputSplit> getSplits(JobContext job >>> ) throws IOException { >>> long minSize = Math.max(getFormatMinSplitSize(), >>> getMinSplitSize(job)); >>> long maxSize = getMaxSplitSize(job); >>> >>> // generate splits >>> List<InputSplit> splits = new ArrayList<InputSplit>(); >>> for (FileStatus file : listStatus(job)) { >>> Path path = file.getPath(); >>> if (!isPathAcceptable(path)) // filter acceptable data >>> continue; >>> FileSystem fs = path.getFileSystem(job.getConfiguration()); >>> long length = file.getLen(); >>> BlockLocation[] blkLocations = fs.getFileBlockLocations(file, >>> 0, length); >>> if ((length != 0) && isSplitable(job, path)) { >>> long blockSize = file.getBlockSize(); >>> long splitSize = computeSplitSize(blockSize, minSize, >>> maxSize); >>> >>> long bytesRemaining = length; >>> while (((double) bytesRemaining) / splitSize > >>> SPLIT_SLOP) { >>> int blkIndex = getBlockIndex(blkLocations, length - >>> bytesRemaining); >>> splits.add(new FileSplit(path, length - >>> bytesRemaining, splitSize, >>> blkLocations[blkIndex].getHosts())); >>> bytesRemaining -= splitSize; >>> } >>> >>> if (bytesRemaining != 0) { >>> splits.add(new FileSplit(path, length - >>> bytesRemaining, bytesRemaining, >>> blkLocations[blkLocations.length - >>> 1].getHosts())); >>> } >>> } >>> else if (length != 0) { >>> splits.add(new FileSplit(path, 0, length, >>> blkLocations[0].getHosts())); >>> } >>> else { >>> //Create empty hosts array for zero length files >>> splits.add(new FileSplit(path, 0, length, new >>> String[0])); >>> } >>> } >>> // LOG.debug("Total # of splits: " + splits.size()); >>> return splits; >>> } >>> >>> /** >>> * Custom RecordReader which returns the entire file as a >>> * single m_Value with the name as a m_Key >>> * Value is the entire file >>> * Key is the file name >>> */ >>> public class MyXMLFileReader extends RecordReader<Text, Text> { >>> >>> private CompressionCodecFactory compressionCodecs = null; >>> private long m_Start; >>> private long m_End; >>> private long m_Current; >>> private BufferedReader m_Input; >>> private Text m_Key; >>> private Text m_Value = null; >>> private char[] m_Buffer = new char[BUFFER_SIZE]; >>> StringBuilder m_Sb = new StringBuilder(); >>> >>> public void initialize(InputSplit genericSplit, >>> TaskAttemptContext context) throws >>> IOException { >>> FileSplit split = (FileSplit) genericSplit; >>> Configuration job = context.getConfiguration(); >>> m_Sb.setLength(0); >>> m_Start = split.getStart(); >>> m_End = m_Start + split.getLength(); >>> final Path file = split.getPath(); >>> compressionCodecs = new CompressionCodecFactory(job); >>> final CompressionCodec codec = >>> compressionCodecs.getCodec(file); >>> >>> // open the file and seek to the m_Start of the split >>> FileSystem fs = file.getFileSystem(job); >>> FSDataInputStream fileIn = fs.open(split.getPath()); >>> if (codec != null) { >>> CompressionInputStream inputStream = >>> codec.createInputStream(fileIn); >>> m_Input = new BufferedReader(new >>> InputStreamReader(inputStream)); >>> m_End = Long.MAX_VALUE; >>> } >>> else { >>> m_Input = new BufferedReader(new >>> InputStreamReader(fileIn)); >>> } >>> m_Current = m_Start; >>> if (m_Key == null) { >>> m_Key = new Text(); >>> } >>> m_Key.set(split.getPath().getName()); >>> if (m_Value == null) { >>> m_Value = new Text(); >>> } >>> >>> } >>> >>> /** >>> * look for a <scan tag then read until it closes >>> * >>> * @return true if there is data >>> * @throws java.io.IOException >>> */ >>> public boolean nextKeyValue() throws IOException { >>> if(readFromCurrentBuffer()) >>> return true; >>> int newSize = 0; >>> String startTag = getStartTag() + " "; >>> String startTag2 = getStartTag() + ">"; >>> newSize = m_Input.read(m_Buffer); >>> >>> while (newSize > 0) { >>> m_Current += newSize; >>> m_Sb.append(m_Buffer, 0, newSize); >>> if( readFromCurrentBuffer()) >>> return true; >>> newSize = m_Input.read(m_Buffer); >>> } >>> // exit because we are at the m_End >>> if (newSize <= 0) { >>> m_Key = null; >>> m_Value = null; >>> return false; >>> } >>> >>> return true; >>> } >>> >>> protected boolean readFromCurrentBuffer() >>> { >>> String endTag = getEndTag(); >>> String startText = m_Sb.toString(); >>> if(!startText.contains(endTag)) >>> return false; // need more read >>> String startTag = getStartTag() + " "; >>> String startTag2 = getStartTag() + ">"; >>> int index = startText.indexOf(startTag); >>> if (index == -1) >>> index = startText.indexOf(startTag2); >>> if(index == -1) >>> return false; >>> startText = startText.substring(index); >>> m_Sb.setLength(0); >>> m_Sb.append(startText); >>> >>> String s = startText; >>> index = s.indexOf(endTag); >>> if (index == -1) >>> return false; // need more read >>> // throw new IllegalStateException("unmatched tag " + >>> getBaseTag()); >>> index += endTag.length(); >>> String tag = s.substring(0, index).trim(); >>> m_Value.set(tag); >>> >>> // keep the remaining text to add to the next tag >>> m_Sb.setLength(0); >>> String rest = s.substring(index); >>> m_Sb.append(rest); >>> return true; >>> } >>> >>> @Override >>> public Text getCurrentKey() { >>> return m_Key; >>> } >>> >>> @Override >>> public Text getCurrentValue() { >>> return m_Value; >>> } >>> >>> /** >>> * Get the progress within the split >>> */ >>> public float getProgress() { >>> return ((float) m_Current - m_Start) / (m_Start - m_End); >>> } >>> >>> public synchronized void close() throws IOException { >>> if (m_Input != null) { >>> m_Input.close(); >>> } >>> } >>> } >>> } >>> >>> ============================================= >>> >>> >>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shi...@gmail.com> wrote: >>> >>>> Hello everyone, >>>> >>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R >>>> program to parse a file and generate a PMML file as output. >>>> >>>> What I would like to do is split a file by a keyword instead a given >>>> number of lines because the location of the split could change from time to >>>> time. >>>> >>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would >>>> be the way to go but I'm not finding any clear examples how to use it. So >>>> I'm not sure if this is the right choice or not. >>>> >>>> Here is a basic input example of what I'm working with. >>>> >>>> [Input file info] >>>> more info >>>> more info >>>> etc. >>>> etc. >>>> *Keyword* >>>> different info >>>> different info >>>> *Keyword* >>>> some more info >>>> >>>> For the example above, each section can be generated separately from >>>> each other. However, within each section, different lines are dependent >>>> upon >>>> each other to generate a valid PMML file. >>>> >>>> Can anyone offer a suggestion what type of input format I should use? >>>> >>>> Thanks for your time >>>> Erik >>>> >>> >>> >>> >>> -- >>> Steven M. Lewis PhD >>> 4221 105th Ave NE >>> Kirkland, WA 98033 >>> 206-384-1340 (cell) >>> Skype lordjoe_com >>> >>> >>> >> > > > -- > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com > > >