Author: siren Date: Tue Jun 27 12:34:20 2006 New Revision: 417567 URL: http://svn.apache.org/viewvc?rev=417567&view=rev Log: NUTCH-306 fix for concurrency problem contributed by Grant Glouser
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=417567&r1=417566&r2=417567&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Tue Jun 27 12:34:20 2006 @@ -82,7 +82,7 @@ Runnable { private InetSocketAddress[] defaultAddresses; - private InetSocketAddress[] liveAddresses; + private boolean[] liveServer; private HashMap segmentToAddress = new HashMap(); private boolean running = true; @@ -128,6 +128,7 @@ public Client(InetSocketAddress[] addresses, Configuration conf) throws IOException { this.conf = conf; this.defaultAddresses = addresses; + this.liveServer = new boolean[addresses.length]; updateSegments(); setDaemon(true); start(); @@ -162,7 +163,9 @@ int liveServers=0; int liveSegments=0; - Vector liveAddresses=new Vector(); + + // Create new array of flags so they can all be updated at once. + boolean[] updatedLiveServer = new boolean[defaultAddresses.length]; // build segmentToAddress map Object[][] params = new Object[defaultAddresses.length][0]; @@ -173,6 +176,7 @@ InetSocketAddress addr = defaultAddresses[i]; String[] segments = results[i]; if (segments == null) { + updatedLiveServer[i] = false; if (LOG.isWarnEnabled()) { LOG.warn("Client: no segments from: " + addr); } @@ -184,13 +188,13 @@ } segmentToAddress.put(segments[j], addr); } - liveAddresses.add(addr); + updatedLiveServer[i] = true; liveServers++; liveSegments+=segments.length; } - this.liveAddresses = (InetSocketAddress[]) // update liveAddresses - liveAddresses.toArray(new InetSocketAddress[liveAddresses.size()]); + // Now update live server flags. + this.liveServer = updatedLiveServer; if (LOG.isInfoEnabled()) { LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments."); @@ -206,7 +210,26 @@ public Hits search(final Query query, final int numHits, final String dedupField, final String sortField, final boolean reverse) throws IOException { - long totalHits = 0; + // Get the list of live servers. It would be nice to build this + // list in updateSegments(), but that would create concurrency issues. + // We grab a local reference to the live server flags in case it + // is updated while we are building our list of liveAddresses. + boolean[] savedLiveServer = this.liveServer; + int numLive = 0; + for (int i = 0; i < savedLiveServer.length; i++) { + if (savedLiveServer[i]) + numLive++; + } + InetSocketAddress[] liveAddresses = new InetSocketAddress[numLive]; + int[] liveIndexNos = new int[numLive]; + int k = 0; + for (int i = 0; i < savedLiveServer.length; i++) { + if (savedLiveServer[i]) { + liveAddresses[k] = defaultAddresses[i]; + liveIndexNos[k] = i; + k++; + } + } Object[][] params = new Object[liveAddresses.length][5]; for (int i = 0; i < params.length; i++) { @@ -230,6 +253,7 @@ queue = new TreeSet(); } + long totalHits = 0; Comparable maxValue = null; for (int i = 0; i < results.length; i++) { Hits hits = results[i]; @@ -241,7 +265,7 @@ ((reverse || sortField == null) ? h.getSortValue().compareTo(maxValue) >= 0 : h.getSortValue().compareTo(maxValue) <= 0)) { - queue.add(new Hit(i, h.getIndexDocNo(), + queue.add(new Hit(liveIndexNos[i], h.getIndexDocNo(), h.getSortValue(), h.getDedupValue())); if (queue.size() > numHits) { // if hit queue overfull queue.remove(queue.last()); // remove lowest in hit queue @@ -255,7 +279,7 @@ private Protocol getRemote(Hit hit) { return (Protocol) - RPC.getProxy(Protocol.class, liveAddresses[hit.getIndexNo()], conf); + RPC.getProxy(Protocol.class, defaultAddresses[hit.getIndexNo()], conf); } private Protocol getRemote(HitDetails hit) { @@ -276,7 +300,7 @@ InetSocketAddress[] addrs = new InetSocketAddress[hits.length]; Object[][] params = new Object[hits.length][1]; for (int i = 0; i < hits.length; i++) { - addrs[i] = liveAddresses[hits[i].getIndexNo()]; + addrs[i] = defaultAddresses[hits[i].getIndexNo()]; params[i][0] = hits[i]; } return (HitDetails[])RPC.call(DETAILS, params, addrs, conf); @@ -368,7 +392,7 @@ updateSegments(); } catch (IOException ioe) { if (LOG.isWarnEnabled()) { LOG.warn("No search servers available!"); } - liveAddresses=new InetSocketAddress[0]; + liveServer = new boolean[defaultAddresses.length]; } } } Using Tomcat but need to do more? Need to support web services, security? Get stuff done quickly with pre-integrated technology to make your job easier Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642 _______________________________________________ Nutch-cvs mailing list Nutch-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/nutch-cvs