You are correct = they got refactored int readFromCurrentBuffer On Mon, Jul 18, 2011 at 11:47 AM, Erik T <erik.shi...@gmail.com> wrote:
> I understand that part but I don't see startTag or startTag2 used in the > nextKeyValue method after they have been declared. > Erik > > > > On 18 July 2011 14:20, Steve Lewis <lordjoe2...@gmail.com> wrote: > >> The reason for the two id that it may say >> <Foo> .... >> or >> <Foo attr1="... >> - now I suppose you could just look for <Foo which would cover either case >> >> Also note I am cheating a bit and this will not handle properly tags which >> are commented out with >> the xml comment <!-- but I doubt it is possible to handle these without >> parsing the entire (potentially large file) >> >> >> On Mon, Jul 18, 2011 at 9:40 AM, Erik T <erik.shi...@gmail.com> wrote: >> >>> Hi Steven, >>> >>> Thank you for the sample. I have one question though. >>> >>> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed? >>> Erik >>> >>> >>> >>> On 11 July 2011 15:11, Steve Lewis <lordjoe2...@gmail.com> wrote: >>> >>>> Look at this sample >>>> ============================================= >>>> package org.systemsbiology.hadoop; >>>> >>>> >>>> >>>> import org.apache.hadoop.conf.*; >>>> import org.apache.hadoop.fs.*; >>>> import org.apache.hadoop.fs.FileSystem; >>>> import org.apache.hadoop.io.*; >>>> import org.apache.hadoop.io.compress.*; >>>> import org.apache.hadoop.mapreduce.*; >>>> import org.apache.hadoop.mapreduce.lib.input.*; >>>> >>>> import java.io.*; >>>> import java.util.*; >>>> >>>> /** >>>> * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat >>>> * Splitter that reads scan tags from an XML file >>>> * No assumption is made about lines but tage and end tags MUST look >>>> like <MyTag </MyTag> with no embedded spaces >>>> * usually you will subclass and hard code the tag you want to split on >>>> */ >>>> public class XMLTagInputFormat extends FileInputFormat<Text, Text> { >>>> public static final XMLTagInputFormat[] EMPTY_ARRAY = {}; >>>> >>>> >>>> private static final double SPLIT_SLOP = 1.1; // 10% slop >>>> >>>> >>>> public static final int BUFFER_SIZE = 4096; >>>> >>>> private final String m_BaseTag; >>>> private final String m_StartTag; >>>> private final String m_EndTag; >>>> private String m_Extension; >>>> >>>> public XMLTagInputFormat(final String pBaseTag) { >>>> m_BaseTag = pBaseTag; >>>> m_StartTag = "<" + pBaseTag; >>>> m_EndTag = "</" + pBaseTag + ">"; >>>> >>>> } >>>> >>>> public String getExtension() { >>>> return m_Extension; >>>> } >>>> >>>> public void setExtension(final String pExtension) { >>>> m_Extension = pExtension; >>>> } >>>> >>>> public boolean isSplitReadable(InputSplit split) { >>>> if (!(split instanceof FileSplit)) >>>> return true; >>>> FileSplit fsplit = (FileSplit) split; >>>> Path path1 = fsplit.getPath(); >>>> return isPathAcceptable(path1); >>>> } >>>> >>>> protected boolean isPathAcceptable(final Path pPath1) { >>>> String path = pPath1.toString().toLowerCase(); >>>> if(path.startsWith("part-r-")) >>>> return true; >>>> String extension = getExtension(); >>>> if (extension != null && path.endsWith(extension.toLowerCase())) >>>> return true; >>>> if (extension != null && path.endsWith(extension.toLowerCase() + >>>> ".gz")) >>>> return true; >>>> if (extension == null ) >>>> return true; >>>> return false; >>>> } >>>> >>>> public String getStartTag() { >>>> return m_StartTag; >>>> } >>>> >>>> public String getBaseTag() { >>>> return m_BaseTag; >>>> } >>>> >>>> public String getEndTag() { >>>> return m_EndTag; >>>> } >>>> >>>> @Override >>>> public RecordReader<Text, Text> createRecordReader(InputSplit split, >>>> >>>> TaskAttemptContext context) { >>>> if (isSplitReadable(split)) >>>> return new MyXMLFileReader(); >>>> else >>>> return NullRecordReader.INSTANCE; // do not read >>>> } >>>> >>>> @Override >>>> protected boolean isSplitable(JobContext context, Path file) { >>>> String fname = file.getName().toLowerCase(); >>>> if(fname.endsWith(".gz")) >>>> return false; >>>> return true; >>>> } >>>> >>>> /** >>>> * Generate the list of files and make them into FileSplits. >>>> * This needs to be copied to insert a filter on acceptable data >>>> */ >>>> @Override >>>> public List<InputSplit> getSplits(JobContext job >>>> ) throws IOException { >>>> long minSize = Math.max(getFormatMinSplitSize(), >>>> getMinSplitSize(job)); >>>> long maxSize = getMaxSplitSize(job); >>>> >>>> // generate splits >>>> List<InputSplit> splits = new ArrayList<InputSplit>(); >>>> for (FileStatus file : listStatus(job)) { >>>> Path path = file.getPath(); >>>> if (!isPathAcceptable(path)) // filter acceptable data >>>> continue; >>>> FileSystem fs = path.getFileSystem(job.getConfiguration()); >>>> long length = file.getLen(); >>>> BlockLocation[] blkLocations = >>>> fs.getFileBlockLocations(file, 0, length); >>>> if ((length != 0) && isSplitable(job, path)) { >>>> long blockSize = file.getBlockSize(); >>>> long splitSize = computeSplitSize(blockSize, minSize, >>>> maxSize); >>>> >>>> long bytesRemaining = length; >>>> while (((double) bytesRemaining) / splitSize > >>>> SPLIT_SLOP) { >>>> int blkIndex = getBlockIndex(blkLocations, length - >>>> bytesRemaining); >>>> splits.add(new FileSplit(path, length - >>>> bytesRemaining, splitSize, >>>> blkLocations[blkIndex].getHosts())); >>>> bytesRemaining -= splitSize; >>>> } >>>> >>>> if (bytesRemaining != 0) { >>>> splits.add(new FileSplit(path, length - >>>> bytesRemaining, bytesRemaining, >>>> blkLocations[blkLocations.length - >>>> 1].getHosts())); >>>> } >>>> } >>>> else if (length != 0) { >>>> splits.add(new FileSplit(path, 0, length, >>>> blkLocations[0].getHosts())); >>>> } >>>> else { >>>> //Create empty hosts array for zero length files >>>> splits.add(new FileSplit(path, 0, length, new >>>> String[0])); >>>> } >>>> } >>>> // LOG.debug("Total # of splits: " + splits.size()); >>>> return splits; >>>> } >>>> >>>> /** >>>> * Custom RecordReader which returns the entire file as a >>>> * single m_Value with the name as a m_Key >>>> * Value is the entire file >>>> * Key is the file name >>>> */ >>>> public class MyXMLFileReader extends RecordReader<Text, Text> { >>>> >>>> private CompressionCodecFactory compressionCodecs = null; >>>> private long m_Start; >>>> private long m_End; >>>> private long m_Current; >>>> private BufferedReader m_Input; >>>> private Text m_Key; >>>> private Text m_Value = null; >>>> private char[] m_Buffer = new char[BUFFER_SIZE]; >>>> StringBuilder m_Sb = new StringBuilder(); >>>> >>>> public void initialize(InputSplit genericSplit, >>>> TaskAttemptContext context) throws >>>> IOException { >>>> FileSplit split = (FileSplit) genericSplit; >>>> Configuration job = context.getConfiguration(); >>>> m_Sb.setLength(0); >>>> m_Start = split.getStart(); >>>> m_End = m_Start + split.getLength(); >>>> final Path file = split.getPath(); >>>> compressionCodecs = new CompressionCodecFactory(job); >>>> final CompressionCodec codec = >>>> compressionCodecs.getCodec(file); >>>> >>>> // open the file and seek to the m_Start of the split >>>> FileSystem fs = file.getFileSystem(job); >>>> FSDataInputStream fileIn = fs.open(split.getPath()); >>>> if (codec != null) { >>>> CompressionInputStream inputStream = >>>> codec.createInputStream(fileIn); >>>> m_Input = new BufferedReader(new >>>> InputStreamReader(inputStream)); >>>> m_End = Long.MAX_VALUE; >>>> } >>>> else { >>>> m_Input = new BufferedReader(new >>>> InputStreamReader(fileIn)); >>>> } >>>> m_Current = m_Start; >>>> if (m_Key == null) { >>>> m_Key = new Text(); >>>> } >>>> m_Key.set(split.getPath().getName()); >>>> if (m_Value == null) { >>>> m_Value = new Text(); >>>> } >>>> >>>> } >>>> >>>> /** >>>> * look for a <scan tag then read until it closes >>>> * >>>> * @return true if there is data >>>> * @throws java.io.IOException >>>> */ >>>> public boolean nextKeyValue() throws IOException { >>>> if(readFromCurrentBuffer()) >>>> return true; >>>> int newSize = 0; >>>> String startTag = getStartTag() + " "; >>>> String startTag2 = getStartTag() + ">"; >>>> newSize = m_Input.read(m_Buffer); >>>> >>>> while (newSize > 0) { >>>> m_Current += newSize; >>>> m_Sb.append(m_Buffer, 0, newSize); >>>> if( readFromCurrentBuffer()) >>>> return true; >>>> newSize = m_Input.read(m_Buffer); >>>> } >>>> // exit because we are at the m_End >>>> if (newSize <= 0) { >>>> m_Key = null; >>>> m_Value = null; >>>> return false; >>>> } >>>> >>>> return true; >>>> } >>>> >>>> protected boolean readFromCurrentBuffer() >>>> { >>>> String endTag = getEndTag(); >>>> String startText = m_Sb.toString(); >>>> if(!startText.contains(endTag)) >>>> return false; // need more read >>>> String startTag = getStartTag() + " "; >>>> String startTag2 = getStartTag() + ">"; >>>> int index = startText.indexOf(startTag); >>>> if (index == -1) >>>> index = startText.indexOf(startTag2); >>>> if(index == -1) >>>> return false; >>>> startText = startText.substring(index); >>>> m_Sb.setLength(0); >>>> m_Sb.append(startText); >>>> >>>> String s = startText; >>>> index = s.indexOf(endTag); >>>> if (index == -1) >>>> return false; // need more read >>>> // throw new IllegalStateException("unmatched tag " + >>>> getBaseTag()); >>>> index += endTag.length(); >>>> String tag = s.substring(0, index).trim(); >>>> m_Value.set(tag); >>>> >>>> // keep the remaining text to add to the next tag >>>> m_Sb.setLength(0); >>>> String rest = s.substring(index); >>>> m_Sb.append(rest); >>>> return true; >>>> } >>>> >>>> @Override >>>> public Text getCurrentKey() { >>>> return m_Key; >>>> } >>>> >>>> @Override >>>> public Text getCurrentValue() { >>>> return m_Value; >>>> } >>>> >>>> /** >>>> * Get the progress within the split >>>> */ >>>> public float getProgress() { >>>> return ((float) m_Current - m_Start) / (m_Start - m_End); >>>> } >>>> >>>> public synchronized void close() throws IOException { >>>> if (m_Input != null) { >>>> m_Input.close(); >>>> } >>>> } >>>> } >>>> } >>>> >>>> ============================================= >>>> >>>> >>>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shi...@gmail.com> wrote: >>>> >>>>> Hello everyone, >>>>> >>>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R >>>>> program to parse a file and generate a PMML file as output. >>>>> >>>>> What I would like to do is split a file by a keyword instead a given >>>>> number of lines because the location of the split could change from time >>>>> to >>>>> time. >>>>> >>>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would >>>>> be the way to go but I'm not finding any clear examples how to use it. So >>>>> I'm not sure if this is the right choice or not. >>>>> >>>>> Here is a basic input example of what I'm working with. >>>>> >>>>> [Input file info] >>>>> more info >>>>> more info >>>>> etc. >>>>> etc. >>>>> *Keyword* >>>>> different info >>>>> different info >>>>> *Keyword* >>>>> some more info >>>>> >>>>> For the example above, each section can be generated separately from >>>>> each other. However, within each section, different lines are dependent >>>>> upon >>>>> each other to generate a valid PMML file. >>>>> >>>>> Can anyone offer a suggestion what type of input format I should use? >>>>> >>>>> Thanks for your time >>>>> Erik >>>>> >>>> >>>> >>>> >>>> -- >>>> Steven M. Lewis PhD >>>> 4221 105th Ave NE >>>> Kirkland, WA 98033 >>>> 206-384-1340 (cell) >>>> Skype lordjoe_com >>>> >>>> >>>> >>> >> >> >> -- >> Steven M. Lewis PhD >> 4221 105th Ave NE >> Kirkland, WA 98033 >> 206-384-1340 (cell) >> Skype lordjoe_com >> >> >> > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com