Update of /cvsroot/nutch/nutch/src/java/net/nutch/indexer In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv20100/src/java/net/nutch/indexer
Modified Files: DeleteDuplicates.java IndexMerger.java IndexSegment.java Log Message: Modify the NutchFileSystem so caller can more transparently modify files that could be local or remote. In the local case, they are modified directly in place. In the remote case, they are copied to a local path, then placed back on the remote fs. This was the effect of previous code, but it could sometimes result in unnecessary copying. This API I think is more clear, and has no inefficiencies for the localfs case. (And is as fast as possible for remotefs, given that we have no NDFS API for Lucene-formatted files.) Index: IndexMerger.java =================================================================== RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/indexer/IndexMerger.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** IndexMerger.java 8 Sep 2004 16:29:12 -0000 1.6 --- IndexMerger.java 4 Oct 2004 15:42:57 -0000 1.7 *************** *** 5,8 **** --- 5,9 ---- import java.io.*; + import java.text.*; import java.util.*; import java.util.logging.*; *************** *** 29,32 **** --- 30,34 ---- private NutchFileSystem nfs; private File outputIndex; + private File localWorkingDir; private File[] segments; *************** *** 34,41 **** * Merge all of the segments given */ ! public IndexMerger(NutchFileSystem nfs, File[] segments, File outputIndex) { this.nfs = nfs; this.segments = segments; this.outputIndex = outputIndex; } --- 36,44 ---- * Merge all of the segments given */ ! public IndexMerger(NutchFileSystem nfs, File[] segments, File outputIndex, File localWorkingDir) throws IOException { this.nfs = nfs; this.segments = segments; this.outputIndex = outputIndex; + this.localWorkingDir = localWorkingDir; } *************** *** 47,57 **** // Open local copies of NFS indices // - File workDir = new File(new File("mergesegdir" + System.currentTimeMillis()).getCanonicalPath()); - workDir.mkdir(); Directory[] dirs = new Directory[segments.length]; File[] localSegments = new File[segments.length]; for (int i = 0; i < segments.length; i++) { ! localSegments[i] = new File(workDir, "mergeseg" + System.currentTimeMillis()); ! nfs.putToLocalFile(new File(segments[i], "index"), localSegments[i]); dirs[i] = FSDirectory.getDirectory(localSegments[i], false); } --- 50,58 ---- // Open local copies of NFS indices // Directory[] dirs = new Directory[segments.length]; File[] localSegments = new File[segments.length]; for (int i = 0; i < segments.length; i++) { ! File tmpFile = new File(localWorkingDir, "indexmerge-" + new SimpleDateFormat("yyyMMddHHmmss").format(new Date(System.currentTimeMillis()))); ! localSegments[i] = nfs.startLocalInput(new File(segments[i], "index"), tmpFile); dirs[i] = FSDirectory.getDirectory(localSegments[i], false); } *************** *** 60,65 **** // Get local output target // ! File localOutput = new File("mergeout" + System.currentTimeMillis()); ! localOutput.delete(); // --- 61,66 ---- // Get local output target // ! File tmpLocalOutput = new File(localWorkingDir, "merge-output"); ! File localOutput = nfs.startLocalOutput(outputIndex, tmpLocalOutput); // *************** *** 77,89 **** // Put target back // ! nfs.addLocalFile(localOutput, outputIndex); // ! // Delete all local inputs // for (int i = 0; i < localSegments.length; i++) { ! FileUtil.fullyDelete(localSegments[i]); } ! workDir.delete(); } --- 78,90 ---- // Put target back // ! nfs.completeLocalOutput(outputIndex, tmpLocalOutput); // ! // Delete all local inputs, if necessary // for (int i = 0; i < localSegments.length; i++) { ! nfs.completeLocalInput(localSegments[i]); } ! localWorkingDir.delete(); } *************** *** 92,96 **** */ public static void main(String[] args) throws Exception { ! String usage = "IndexMerger (-local | -ndfs <nameserver:port>) outputIndex segments..."; if (args.length < 2) { System.err.println("Usage: " + usage); --- 93,97 ---- */ public static void main(String[] args) throws Exception { ! String usage = "IndexMerger (-local | -ndfs <nameserver:port>) [-workingdir <workingdir>] outputIndex segments..."; if (args.length < 2) { System.err.println("Usage: " + usage); *************** *** 103,122 **** NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); try { ! File outputIndex = new File(args[0]); Vector segments = new Vector(); ! for (int i = 1; i < args.length; i++) { if (args[i] != null) { segments.add(new File(args[i])); } } - File[] segmentFiles = (File[]) segments.toArray(new File[segments.size()]); // // Merge the indices // LOG.info("merging segment indexes to: " + outputIndex); ! IndexMerger merger = new IndexMerger(nfs, segmentFiles, outputIndex); merger.merge(); LOG.info("done merging"); } finally { nfs.close(); --- 104,137 ---- NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); try { ! File workingDir = new File(new File("indexmerger-workingdir").getCanonicalPath()); Vector segments = new Vector(); ! ! int i = 0; ! if ("-workingdir".equals(args[i])) { ! i++; ! workingDir = new File(new File(args[i++]).getCanonicalPath()); ! } ! File outputIndex = new File(args[i++]); ! ! for (; i < args.length; i++) { if (args[i] != null) { segments.add(new File(args[i])); } } // // Merge the indices // + File[] segmentFiles = (File[]) segments.toArray(new File[segments.size()]); LOG.info("merging segment indexes to: " + outputIndex); ! ! if (workingDir.exists()) { ! FileUtil.fullyDelete(workingDir); ! } ! workingDir.mkdirs(); ! IndexMerger merger = new IndexMerger(nfs, segmentFiles, outputIndex, workingDir); merger.merge(); LOG.info("done merging"); + FileUtil.fullyDelete(workingDir); } finally { nfs.close(); Index: IndexSegment.java =================================================================== RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/indexer/IndexSegment.java,v retrieving revision 1.20 retrieving revision 1.21 diff -C2 -d -r1.20 -r1.21 *** IndexSegment.java 8 Sep 2004 16:29:12 -0000 1.20 --- IndexSegment.java 4 Oct 2004 15:42:57 -0000 1.21 *************** *** 66,75 **** // OK, fine. Build the writer to the local file, set params // ! File outputDir = new File(localWorkingDir, "workingIndexDir"); ! outputDir.delete(); ! outputDir.mkdirs(); IndexWriter writer ! = new IndexWriter(new File(outputDir, "index"), new NutchDocumentAnalyzer(), true); writer.mergeFactor = 50; --- 66,76 ---- // OK, fine. Build the writer to the local file, set params // ! File outputIndex = new File(srcDir, "index"); ! File tmpOutputIndex = new File(localWorkingDir, "index"); ! ! File localOutput = nfs.startLocalOutput(outputIndex, tmpOutputIndex); IndexWriter writer ! = new IndexWriter(localOutput, new NutchDocumentAnalyzer(), true); writer.mergeFactor = 50; *************** *** 141,146 **** // Put the local file in its place via NFS // ! nfs.addLocalFile(new File(outputDir, "index"), new File(srcDir, "index")); ! outputDir.delete(); // --- 142,147 ---- // Put the local file in its place via NFS // ! //nfs.completeLocalOutput(new File(outputDir, "index"), new File(srcDir, "index")); ! nfs.completeLocalOutput(outputIndex, tmpOutputIndex); // *************** *** 189,193 **** */ public static void main(String[] args) throws Exception { ! String usage = "IndexSegment (-local | -ndfs <namenode:port>) <segment_directory> [-dir <localWorkingDir>]"; if (args.length == 0) { System.err.println("Usage: " + usage); --- 190,194 ---- */ public static void main(String[] args) throws Exception { ! String usage = "IndexSegment (-local | -ndfs <namenode:port>) <segment_directory> [-dir <workingdir>]"; if (args.length == 0) { System.err.println("Usage: " + usage); *************** *** 199,203 **** int maxDocs = Integer.MAX_VALUE; File srcDir = null; ! File localWorkingDir = new File("/tmp"); for (int i = 0; i < args.length; i++) { if (args[i] != null) { --- 200,204 ---- int maxDocs = Integer.MAX_VALUE; File srcDir = null; ! File workingDir = new File(new File("indexsegment-workingdir").getCanonicalPath()); for (int i = 0; i < args.length; i++) { if (args[i] != null) { *************** *** 207,211 **** } else if (args[i].equals("-dir")) { i++; ! localWorkingDir = new File(args[i]); } else { srcDir = new File(args[i]); --- 208,212 ---- } else if (args[i].equals("-dir")) { i++; ! workingDir = new File(new File(args[i]).getCanonicalPath()); } else { srcDir = new File(args[i]); *************** *** 214,221 **** } ! IndexSegment indexer = new IndexSegment(nfs, maxDocs, srcDir, localWorkingDir); LOG.info("indexing segment: " + srcDir); indexer.indexPages(); LOG.info("done indexing"); } finally { nfs.close(); --- 215,226 ---- } ! if (workingDir.exists()) { ! FileUtil.fullyDelete(workingDir); ! } ! IndexSegment indexer = new IndexSegment(nfs, maxDocs, srcDir, workingDir); LOG.info("indexing segment: " + srcDir); indexer.indexPages(); LOG.info("done indexing"); + FileUtil.fullyDelete(workingDir); } finally { nfs.close(); Index: DeleteDuplicates.java =================================================================== RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/indexer/DeleteDuplicates.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** DeleteDuplicates.java 8 Sep 2004 16:29:12 -0000 1.14 --- DeleteDuplicates.java 4 Oct 2004 15:42:57 -0000 1.15 *************** *** 123,130 **** * Constructs a duplicate detector for the provided indexes. */ ! public DeleteDuplicates(IndexReader[] readers) throws IOException { this.readers = readers; ! this.tempFile = new File("ddup-" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()))); ! this.tempFile = new File(tempFile.getCanonicalPath()); } --- 123,129 ---- * Constructs a duplicate detector for the provided indexes. */ ! public DeleteDuplicates(IndexReader[] readers, File workingDir) throws IOException { this.readers = readers; ! this.tempFile = new File(workingDir, "ddup-" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()))); } *************** *** 136,139 **** --- 135,139 ---- readers[i].close(); } + tempFile.delete(); } *************** *** 266,270 **** // Usage, arg checking // ! String usage = "DeleteDuplicates (-local | -ndfs <namenode:port>) <segmentsDir>"; if (args.length < 2) { System.err.println("Usage: " + usage); --- 266,270 ---- // Usage, arg checking // ! String usage = "DeleteDuplicates (-local | -ndfs <namenode:port>) [-workingdir <workingdir>] <segmentsDir>"; if (args.length < 2) { System.err.println("Usage: " + usage); *************** *** 273,285 **** NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); try { // // Build an array of IndexReaders for all the segments we want to process // ! String segmentsDir = args[0]; File[] directories = nfs.listFiles(new File(segmentsDir)); Vector vReaders = new Vector(); Vector putbackList = new Vector(); int maxDoc = 0; for (int i = 0; i < directories.length; i++) { // --- 273,292 ---- NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); + File workingDir = new File(new File("ddup-workingdir").getCanonicalPath()); try { // // Build an array of IndexReaders for all the segments we want to process // ! int j = 0; ! if ("-workingdir".equals(args[j])) { ! j++; ! workingDir = new File(new File(args[j++]).getCanonicalPath()); ! } ! String segmentsDir = args[j++]; File[] directories = nfs.listFiles(new File(segmentsDir)); Vector vReaders = new Vector(); Vector putbackList = new Vector(); int maxDoc = 0; + for (int i = 0; i < directories.length; i++) { // *************** *** 289,298 **** if (nfs.exists(indexDone) && nfs.isFile(indexDone)) { // ! // Bring the specified segment to a local dir for processing // File indexDir = new File(directories[i], "index"); ! File localIndexDir = nfs.makeLocal(indexDir, "ddup"); putbackList.add(indexDir); ! putbackList.add(localIndexDir); // --- 296,307 ---- if (nfs.exists(indexDone) && nfs.isFile(indexDone)) { // ! // Make sure the specified segment can be processed locally // File indexDir = new File(directories[i], "index"); ! File tmpDir = new File(workingDir, "ddup-" + new SimpleDateFormat("yyyMMddHHmmss").format(new Date(System.currentTimeMillis()))); ! File localIndexDir = nfs.startLocalOutput(indexDir, tmpDir); ! putbackList.add(indexDir); ! putbackList.add(tmpDir); // *************** *** 316,320 **** readers[i] = (IndexReader)vReaders.remove(0); } ! DeleteDuplicates dd = new DeleteDuplicates(readers); dd.deleteUrlDuplicates(); dd.deleteContentDuplicates(); --- 325,334 ---- readers[i] = (IndexReader)vReaders.remove(0); } ! ! if (workingDir.exists()) { ! FileUtil.fullyDelete(workingDir); ! } ! workingDir.mkdirs(); ! DeleteDuplicates dd = new DeleteDuplicates(readers, workingDir); dd.deleteUrlDuplicates(); dd.deleteContentDuplicates(); *************** *** 322,334 **** // ! // Dups have been deleted locally. Now put them back to NFS // LOG.info("Duplicate deletion complete locally. Now returning to NFS..."); for (Iterator it = putbackList.iterator(); it.hasNext(); ) { File indexDir = (File) it.next(); ! File localIndexDir = (File) it.next(); ! nfs.completeLocalWork(localIndexDir, indexDir); } LOG.info("DeleteDuplicates complete"); } finally { nfs.close(); --- 336,349 ---- // ! // Dups have been deleted. Now make sure they are placed back to NFS // LOG.info("Duplicate deletion complete locally. Now returning to NFS..."); for (Iterator it = putbackList.iterator(); it.hasNext(); ) { File indexDir = (File) it.next(); ! File tmpDir = (File) it.next(); ! nfs.completeLocalOutput(indexDir, tmpDir); } LOG.info("DeleteDuplicates complete"); + FileUtil.fullyDelete(workingDir); } finally { nfs.close(); ------------------------------------------------------- This SF.net email is sponsored by: IT Product Guide on ITManagersJournal Use IT products in your business? Tell us what you think of them. Give us Your Opinions, Get Free ThinkGeek Gift Certificates! Click to find out more http://productguide.itmanagersjournal.com/guidepromo.tmpl _______________________________________________ Nutch-cvs mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/nutch-cvs