Author: kubes Date: Tue Dec 4 11:13:28 2007 New Revision: 601043 URL: http://svn.apache.org/viewvc?rev=601043&view=rev Log: NUTCH-581 - DistributedSearch does not update search servers added to search-servers.txt on the fly. This allows search servers to be added and removed on the fly. Thanks Rohan.
Added: lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java - copied, changed from r600001, lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java Removed: lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java Modified: lucene/nutch/trunk/CHANGES.txt lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Modified: lucene/nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=601043&r1=601042&r2=601043&view=diff ============================================================================== --- lucene/nutch/trunk/CHANGES.txt (original) +++ lucene/nutch/trunk/CHANGES.txt Tue Dec 4 11:13:28 2007 @@ -173,6 +173,9 @@ index-basic plugin. For backwards compatibility, add index-anchor plugin to nutch-site.xml plugin.includes. (kubes) +60. NUTCH-581 - DistributedSearch does not update search servers added to + search-servers.txt on the fly. (Rohan Mehta via kubes) + Release 0.9 - 2007-04-02 1. Changed log4j confiquration to log to stdout on commandline Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=601043&r1=601042&r2=601043&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Tue Dec 4 11:13:28 2007 @@ -17,24 +17,27 @@ package org.apache.nutch.searcher; -import java.net.InetSocketAddress; -import java.io.*; -import java.util.*; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.StringTokenizer; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.apache.nutch.parse.ParseData; -import org.apache.nutch.parse.ParseText; -import org.apache.nutch.crawl.Inlinks; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; - +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseText; import org.apache.nutch.util.NutchConfiguration; /** Implements the search API over IPC connnections. */ @@ -94,12 +97,19 @@ private boolean running = true; private Configuration conf; + private Path file; + private long timestamp; + private FileSystem fs; + /** Construct a client talking to servers listed in the named file. * Each line in the file lists a server hostname and port, separated by * whitespace. */ - public Client(Path file, Configuration conf) throws IOException { + public Client(Path file, Configuration conf) + throws IOException { this(readConfig(file, conf), conf); + this.file = file; + this.timestamp = fs.getFileStatus(file).getModificationTime(); } private static InetSocketAddress[] readConfig(Path path, Configuration conf) @@ -135,6 +145,7 @@ this.conf = conf; this.defaultAddresses = addresses; this.liveServer = new boolean[addresses.length]; + this.fs = FileSystem.get(conf); updateSegments(); setDaemon(true); start(); @@ -160,6 +171,24 @@ } } + /** + * Check to see if search-servers file has been modified + * + * @throws IOException + */ + public boolean isFileModified() + throws IOException { + + if (file != null) { + long modTime = fs.getFileStatus(file).getModificationTime(); + if (timestamp < modTime) { + this.timestamp = fs.getFileStatus(file).getModificationTime(); + return true; + } + } + + return false; + } /** Updates segment names. * @@ -167,8 +196,12 @@ */ public void updateSegments() throws IOException { - int liveServers=0; - int liveSegments=0; + int liveServers = 0; + int liveSegments = 0; + + if (isFileModified()) { + defaultAddresses = readConfig(file, conf); + } // Create new array of flags so they can all be updated at once. boolean[] updatedLiveServer = new boolean[defaultAddresses.length]; @@ -188,15 +221,17 @@ } continue; } + for (int j = 0; j < segments.length; j++) { if (LOG.isTraceEnabled()) { LOG.trace("Client: segment "+segments[j]+" at "+addr); } segmentToAddress.put(segments[j], addr); } + updatedLiveServer[i] = true; liveServers++; - liveSegments+=segments.length; + liveSegments += segments.length; } // Now update live server flags. @@ -413,5 +448,9 @@ running = false; interrupt(); } + + public boolean[] getLiveServer() { + return liveServer; + } } -} +} \ No newline at end of file Copied: lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java (from r600001, lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java) URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java?p2=lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java&p1=lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java&r1=600001&r2=601043&rev=601043&view=diff ============================================================================== --- lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java (original) +++ lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java Tue Dec 4 11:13:28 2007 @@ -16,53 +16,129 @@ */ package org.apache.nutch.searcher; +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.net.InetSocketAddress; +import junit.framework.TestCase; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.Server; import org.apache.nutch.searcher.DistributedSearch.Client; import org.apache.nutch.util.NutchConfiguration; -import junit.framework.TestCase; - -public class DistributedSearchTest extends TestCase { +public class TestDistributedSearch + extends TestCase { private static final int DEFAULT_PORT = 60000; private static final String DISTRIBUTED_SEARCH_TEST_PORT = "distributed.search.test.port"; - Configuration conf; - Path searchdir=new Path("build/test/data/testcrawl/"); - Server server; - - protected void setUp() throws Exception { - super.setUp(); - conf=NutchConfiguration.create(); + + private static final int DEFAULT_PORT1 = 60001; + private static final String DISTRIBUTED_SEARCH_TEST_PORT1 = "distributed.search.test.port1"; + + private static final int DEFAULT_PORT2 = 60002; + private static final String DISTRIBUTED_SEARCH_TEST_PORT2 = "distributed.search.test.port2"; + + Path searchdir = new Path("build/test/data/testcrawl/"); + + public void testDistibutedSearch() + throws IOException { + + Configuration conf = NutchConfiguration.create(); + //set up server & start it - server=DistributedSearch.Server.getServer(conf, searchdir, conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT)); + Server server = DistributedSearch.Server.getServer(conf, searchdir, + conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT)); server.start(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - if(server!=null){ - //stop server - //server.stop(); + + int port = conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT); + + InetSocketAddress[] addresses = new InetSocketAddress[1]; + addresses[0] = new InetSocketAddress("localhost", port); + + Client c = new DistributedSearch.Client(addresses, conf); + + Query query = Query.parse("apache", conf); + Hits hits = c.search(query, 5, null, null, false); + c.getDetails(hits.getHit(0)); + assertTrue(hits.getTotal() > 0); + + if(server != null){ + server.stop(); } } - - public void testDistibutedSearch() throws IOException{ - - int port=conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT); + + public void testUpdateSegments() + throws IOException { + + // Startup 2 search servers. One was already started in setup, start another + // one at a different port + + Configuration conf = NutchConfiguration.create(); + + Server server1 = DistributedSearch.Server.getServer(conf, searchdir, + conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT1, DEFAULT_PORT1)); + + Server server2 = DistributedSearch.Server.getServer(conf, searchdir, + conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT2, DEFAULT_PORT2)); + + server1.start(); + server2.start(); + + /* create a new file search-servers.txt + * with 1 server at port 60000 + */ + FileSystem fs = FileSystem.get(conf); + Path testServersPath = new Path(searchdir, "search-server.txt"); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create( + testServersPath, true))); + bw.write("localhost " + DEFAULT_PORT1 + "\n"); + bw.flush(); + bw.close(); + + /* + * Check if it found the server + */ + Client c = new DistributedSearch.Client(testServersPath, conf); + boolean[] liveServers = c.getLiveServer(); + assertEquals(liveServers.length, 1); + + /* Add both the servers at ports 60000 & 60005 + * to the search-server.txt file + */ + + // give the servers a little time to wait for file modification + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } - InetSocketAddress[] addresses=new InetSocketAddress[1]; - addresses[0]=new InetSocketAddress("localhost", port); + bw = new BufferedWriter(new OutputStreamWriter(fs.create(testServersPath, + true))); + bw.write("localhost " + DEFAULT_PORT1 + "\n"); + bw.write("localhost " + DEFAULT_PORT2 + "\n"); + bw.flush(); + bw.close(); + + // Check if it found both the servers + c.updateSegments(); - Client c=new DistributedSearch.Client(addresses, conf); - Query query=Query.parse("apache", conf); - Hits hits=c.search(query, 5, null, null, false); - c.getDetails(hits.getHit(0)); - assertTrue(hits.getTotal()>0); + liveServers = c.getLiveServer(); + assertEquals(liveServers.length, 2); + + if (server1 != null) { + server1.stop(); + } + if (server2 != null) { + server2.stop(); + } + + fs.delete(testServersPath); } }