Author: cutting Date: Wed Jul 6 14:48:39 2005 New Revision: 209522 URL: http://svn.apache.org/viewcvs?rev=209522&view=rev Log: Fix sorting to work with new sync method. Sorting prefixes temporary data with its length, which is hard to compute with syncs. So syncs are no longer stored in temporary sort files.
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=209522&r1=209521&r2=209522&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Jul 6 14:48:39 2005 @@ -59,7 +59,7 @@ // can seek into the middle of a file and then synchronize with record // starts and ends by scanning for this value. private long lastSyncPos; // position of last sync - private final byte[] sync; // 16 random bytes + private byte[] sync; // 16 random bytes { try { // use hash of uid + host MessageDigest digester = MessageDigest.getInstance("MD5"); @@ -147,7 +147,8 @@ if (keyLength == 0) throw new IOException("zero length keys not allowed"); - if (out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync + if (sync != null && + out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync lastSyncPos = out.getPos(); // update lastSyncPos //LOG.info("sync@"+lastSyncPos); out.writeInt(SYNC_ESCAPE); // escape it @@ -300,7 +301,8 @@ int length = in.readInt(); - if (version[3] > 1 && length == SYNC_ESCAPE) { // process a sync entry + if (version[3] > 1 && sync != null && + length == SYNC_ESCAPE) { // process a sync entry //LOG.info("sync@"+in.getPos()); in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it @@ -529,13 +531,15 @@ long length = buffer.getLength(); // compute its size length += count*8; // allow for length/keyLength - length += (count/SYNC_INTERVAL)*SYNC_SIZE; // allow for syncs out.writeLong(length); // write size out.writeLong(count); // write count } Writer writer = new Writer(out, keyClass, valClass); + if (!done) { + writer.sync = null; // disable sync on temp files + } for (int i = 0; i < count; i++) { // write in sorted order int p = pointers[i]; @@ -640,12 +644,13 @@ long count = in.readLong(); totalLength += length; - totalLength -= (count/SYNC_INTERVAL)*SYNC_SIZE; // remove syncs totalCount+= count; Reader reader = new Reader(nfs, inName, memory/(factor+1), in.getPos(), length); + reader.sync = null; // disable sync on temp files + MergeStream ms = new MergeStream(reader); // add segment to queue if (ms.next()) { queue.put(ms); @@ -654,7 +659,6 @@ } if (!last) { // intermediate file - totalLength += (totalCount/SYNC_INTERVAL)*SYNC_SIZE; // add syncs queue.out.writeLong(totalLength); // write size queue.out.writeLong(totalCount); // write count }