Hi Pedro, It sounds like you're on the right track, but I don't really have time to help much beyond pointing you in the right direction. Time to put on your debugging hat :) Maybe do some testing with a small job like "sleep -mt 1 -rt 1 -m 1 -r 1" - a sleep job with 1 mapper and 1 reducer. If I recall correctly it generates a single map output record... otherwise you could do a "sort" of 1 line of text. Then you can easily add debug output to diagnose what your issue is.
-Todd On Fri, Nov 4, 2011 at 11:10 AM, Pedro Costa <[email protected]> wrote: > I've looked to the MapOutputServlet class. But the problem is the following: > > MapOutput can be compressed or not. When I'm talking about uncompressed > mapoutput, using the index mechanism of the MapOutputServlet, it works for > me. The map tasks generates digests for each partition, and it match with > the digests produce by the reduce. > > Let me explain what I've updated in the code of MR at my own version. A map > task (MT) is producing a digest for each partition of data generated. So, > if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and > Hash2. > > Now, when a reduce task (RT) fetch the map output, it will generate another > digest using the index mechanism of the MapOutputServlet and compares with > the respective digest generated by the map task. > > As you can see in my explanation, when I'm talking about uncompressed map > output, the index mechanism is really useful. > > But I've also tried to do the same with compressed map output. And it > doesn't work. That's the reason that I'm trying now with the IFile.Reader > class. > > As you can see, I'm in a big dilemma and I don't know what to do. > > I will show you my code. This 2 methods are trying to generate digests from > the map and the reduce side. At the end, they give different results, and I > don't know why. These 2 methods are my first tentative to generate digests > from compressed map output > > > [code] > // this method is trying to generate a digest from the compressed map > output on the map side. > public synchronized String generateHash(FileSystem fs, Path filename, > Decompressor decompressor, int offset, int mapOutputLength) { > LOG.debug("Opening file2: " + filename); > > MessageDigest md = null; > String digest = null; > DecompressorStream decodec = null; > FSDataInputStream input = null; > > try { > input = fs.open(filename); > decodec = new DecompressorStream(input, decompressor); > md = MessageDigest.getInstance("SHA-1"); > System.out.println("ABC"); > byte[] buffer; > int size; > while (mapOutputLength > 0) { > // the case that the bytes read is small the the default size. > // We don't want that the message digest contains trash. > size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); > System.out.println("mapOutputLength: " + mapOutputLength + " Size: " + > size); > > if(size == 0) > break; > > buffer = new byte[size]; > size = decodec.read(buffer, offset, size); > System.out.println("read: " + size + "\ndata: " + new String(buffer)); > mapOutputLength -= size; > > if(size > 0) > md.update(buffer); > else > if(size == -1) > break; > } > System.out.println("DFG"); > digest = hashIt(md); > } catch (NoSuchAlgorithmException e) { > //TODO Auto-generated catch block > e.printStackTrace(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } finally { > if(input!= null) > try { > input.close(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > > return digest; > } > [/code] > > > > [code] > // this method is trying to generate the digest from the map output > compressed sent by the reduce > public synchronized String generateHash(byte[] data, Decompressor > decompressor, int offset, int mapOutputLength) { > MessageDigest md = null; > String digest = null; > DecompressorStream decodec = null; > ByteArrayInputStream bis = null; > try { > bis = new ByteArrayInputStream(data); > decodec = new DecompressorStream(bis, decompressor); > md = MessageDigest.getInstance("SHA-1"); > > int size; > byte[] buffer; > while (mapOutputLength > 0) { > // the case that the bytes read is small the the default size. > // We don't want that the message digest contains trash. > size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); > > if(size == 0) > break; > > buffer = new byte[size]; > decodec.read(buffer, offset, size); > md.update(buffer); > > mapOutputLength -= size; > } > > digest = hashIt(md); > } catch (NoSuchAlgorithmException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } finally { > if(bis!= null) > try { > bis.close(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > > return digest; > } > [/code] > > > 2011/11/4 Todd Lipcon <[email protected]> > >> On Fri, Nov 4, 2011 at 10:04 AM, Pedro Costa <[email protected]> wrote: >> > 1- I think that IFIle.reader can only read the whole map output file. I >> > want to read a partition of the map output. How can I do that? How do I >> set >> > the size of a partition in the I >> >> Look at the code for MapOutputServlet - it uses the index mechanism to >> find a particular partition. >> >> > >> > 2 - I know that map output is composed by blocks. What is the size of a >> > block? Is it 64MB by default? >> >> Nope, it doesn't use blocks. That's HDFS you're thinking of. >> >> -Todd >> >> > 2011/11/4 Todd Lipcon <[email protected]> >> > >> >> Hi Pedro, >> >> >> >> The format is called IFile. Check out the source for more info on the >> >> format - it's fairly simple. The partition starts are recorded in a >> >> separate index file next to the output file. >> >> >> >> I don't think you'll find significant docs on this format since it's >> >> MR-internal - the code is your best resource. >> >> >> >> -Todd >> >> >> >> On Fri, Nov 4, 2011 at 8:37 AM, Pedro Costa <[email protected]> wrote: >> >> > Hi, >> >> > >> >> > I'm trying to understand the structure of the map output file. Here's >> an >> >> > example of a mapoutput file that contains 2 partitions: >> >> > >> >> > [code] >> >> > <FF><FF><FF><FF>^@^@716banana banana apple banana carrot carrot apple >> >> > banana 0apple carrot carrot carrot banana carrot carrot 5^N4carrot >> apple >> >> > carrot apple apple carrot banana apple ^Mbanana apple >> >> <FF><FF><DF>|<8E><B7> >> >> > [/code] >> >> > >> >> > 1 - I would like to understand what are the ASCII characters parts. >> What >> >> > they means? >> >> > >> >> > 2 - What type of file is a map output? Is it a >> SequenceFileOutputFormat, >> >> or >> >> > a TextOutputFormat? >> >> > >> >> > 3 - I've a small program that runs independently of the MR that has >> the >> >> > goal to digest each partition and give the correspondent hash. How do >> I >> >> > know where each partition starts? >> >> > >> >> > >> >> > -- >> >> > Thanks, >> >> > PSC >> >> > >> >> >> >> >> >> >> >> -- >> >> Todd Lipcon >> >> Software Engineer, Cloudera >> >> >> > >> > >> > >> > -- >> > Thanks, >> > >> >> >> >> -- >> Todd Lipcon >> Software Engineer, Cloudera >> > > > > -- > Thanks, > -- Todd Lipcon Software Engineer, Cloudera
