Author: orbiter Date: 2008-02-27 16:16:47 +0100 (Wed, 27 Feb 2008) New Revision: 4517
Added: trunk/source/de/anomic/server/serverProcessor.java Modified: trunk/htroot/Status.java trunk/htroot/xml/status_p.java trunk/source/de/anomic/index/indexRWIEntryOrder.java trunk/source/de/anomic/kelondro/kelondroRowCollection.java trunk/source/de/anomic/kelondro/kelondroSortStack.java trunk/source/de/anomic/plasma/plasmaSearchAPI.java trunk/source/de/anomic/plasma/plasmaSearchEvent.java trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java trunk/source/de/anomic/plasma/plasmaWordIndex.java trunk/source/de/anomic/server/serverDomains.java trunk/source/de/anomic/server/serverProfiling.java trunk/source/yacy.java Log: more multithreading support: - replaced some synchronized classes by classes from util.concurrent - used a util.concurrent.SynchronousQueue to implement a persistent sorting thread in the very basic kelondroRowCollection which supports sorting with a second thread in case that a double-core processing CPU is used Modified: trunk/htroot/Status.java =================================================================== --- trunk/htroot/Status.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/htroot/Status.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -58,6 +58,7 @@ import de.anomic.server.serverDomains; import de.anomic.server.serverMemory; import de.anomic.server.serverObjects; +import de.anomic.server.serverProcessor; import de.anomic.server.serverSwitch; import de.anomic.tools.yFormatter; import de.anomic.yacy.yacyCore; @@ -293,7 +294,7 @@ prop.put("freeMemory", serverMemory.bytesToString(rt.freeMemory())); prop.put("totalMemory", serverMemory.bytesToString(rt.totalMemory())); prop.put("maxMemory", serverMemory.bytesToString(rt.maxMemory())); - prop.put("processors", rt.availableProcessors()); + prop.put("processors", serverProcessor.availableCPU); // proxy traffic //prop.put("trafficIn",bytesToString(httpdByteCountInputStream.getGlobalCount())); Modified: trunk/htroot/xml/status_p.java =================================================================== --- trunk/htroot/xml/status_p.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/htroot/xml/status_p.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -44,6 +44,7 @@ import de.anomic.http.httpdByteCountOutputStream; import de.anomic.plasma.plasmaSwitchboard; import de.anomic.server.serverObjects; +import de.anomic.server.serverProcessor; import de.anomic.server.serverSwitch; import de.anomic.yacy.yacyCore; @@ -75,7 +76,7 @@ prop.putNum("freeMemory", rt.freeMemory()); prop.putNum("totalMemory", rt.totalMemory()); prop.putNum("maxMemory", rt.maxMemory()); - prop.putNum("processors", rt.availableProcessors()); + prop.putNum("processors", serverProcessor.availableCPU); // proxy traffic prop.put("trafficIn", httpdByteCountInputStream.getGlobalCount()); Modified: trunk/source/de/anomic/index/indexRWIEntryOrder.java =================================================================== --- trunk/source/de/anomic/index/indexRWIEntryOrder.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/index/indexRWIEntryOrder.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -36,6 +36,7 @@ import de.anomic.plasma.plasmaCondenser; import de.anomic.plasma.plasmaSearchRankingProcess; import de.anomic.plasma.plasmaSearchRankingProfile; +import de.anomic.server.serverProcessor; import de.anomic.yacy.yacyURL; public class indexRWIEntryOrder { @@ -44,8 +45,6 @@ private kelondroMScoreCluster<String> doms; // collected for "authority" heuristic private int maxdomcount; - private static final int processors = Runtime.getRuntime().availableProcessors(); // for multiprocessor support, used during normalization - public indexRWIEntryOrder(plasmaSearchRankingProfile profile) { this.min = null; this.max = null; @@ -60,7 +59,7 @@ ArrayList<indexRWIVarEntry> result = null; //long s0 = System.currentTimeMillis(); - if ((processors > 1) && (container.size() > 600)) { + if ((serverProcessor.useCPU > 1) && (container.size() > 600)) { // run minmax with two threads int middle = container.size() / 2; minmaxfinder mmf0 = new minmaxfinder(container, 0, middle); Modified: trunk/source/de/anomic/kelondro/kelondroRowCollection.java =================================================================== --- trunk/source/de/anomic/kelondro/kelondroRowCollection.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/kelondro/kelondroRowCollection.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -31,9 +31,11 @@ import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.SynchronousQueue; import de.anomic.server.serverFileUtils; import de.anomic.server.serverMemory; +import de.anomic.server.serverProcessor; import de.anomic.server.logging.serverLog; import de.anomic.yacy.yacySeedDB; @@ -41,7 +43,17 @@ public static final double growfactor = 1.4; private static final int isortlimit = 20; + private static final Integer dummy = new Integer(0); + public static final qsortthread sortingthread; + static { + if (serverProcessor.useCPU > 1) { + sortingthread = new qsortthread(); + sortingthread.start(); + } else { + sortingthread = null; + } + } protected byte[] chunkcache; protected int chunkcount; @@ -57,8 +69,6 @@ private static final int exp_order_bound = 5; private static final int exp_collection = 6; - private static int processors = Runtime.getRuntime().availableProcessors(); - public kelondroRowCollection(kelondroRowCollection rc) { this.rowdef = rc.rowdef; this.chunkcache = rc.chunkcache; @@ -465,12 +475,11 @@ } byte[] swapspace = new byte[this.rowdef.objectsize]; int p = partition(0, this.chunkcount, this.sortBound, swapspace); - if ((processors > 1) && (this.chunkcount >= 10000)) { - // sort this using multi-threading; use one second thread - qsortthread qs = new qsortthread(0, p, 0); - qs.start(); + if ((sortingthread != null) && (p > 50) && (sortingthread.isAlive())) { + // sort this using multi-threading + sortingthread.process(this, 0, p, 0); qsort(p, this.chunkcount, 0, swapspace); - try {qs.join();} catch (InterruptedException e) {e.printStackTrace();} + sortingthread.waitFinish(); } else { qsort(0, p, 0, swapspace); qsort(p, this.chunkcount, 0, swapspace); @@ -479,18 +488,56 @@ //assert this.isSorted(); } - private class qsortthread extends Thread { - private int sl, sr, sb; - public qsortthread(int L, int R, int S) { - this.sl = L; - this.sr = R; - this.sb = S; + public static class qsortthread extends Thread { + private boolean terminate; + private SynchronousQueue<qsortobject> startObject; + private SynchronousQueue<Integer> finishObject; + public qsortthread() { + this.terminate = false; + this.startObject = new SynchronousQueue<qsortobject>(); + this.finishObject = new SynchronousQueue<Integer>(); + this.setName("kelondroRowCollection SORT THREAD"); } + public void process(kelondroRowCollection rc, int L, int R, int S) { + assert rc != null; + synchronized (startObject) { + try {this.startObject.put(new qsortobject(rc, L, R, S));} catch (InterruptedException e) {} + } + } + public void waitFinish() { + try {this.finishObject.take();} catch (InterruptedException e) {} + } + public void terminate() { + this.terminate = true; + this.interrupt(); + } public void run() { - qsort(sl, sr, sb, new byte[rowdef.objectsize]); + qsortobject so = null; + while (!terminate) { + try {so = this.startObject.take();} catch (InterruptedException e) { + break; + } + assert so != null; + so.rc.qsort(so.sl, so.sr, so.sb, new byte[so.rc.rowdef.objectsize]); + try {this.finishObject.put(dummy);} catch (InterruptedException e1) { + break; + } + so = null; + } } } + private static class qsortobject { + protected kelondroRowCollection rc; + protected int sl, sr, sb; + public qsortobject(kelondroRowCollection rc, int L, int R, int S) { + this.rc = rc; + this.sl = L; + this.sr = R; + this.sb = S; + } + } + private final void qsort(int L, int R, int S, byte[] swapspace) { if (R - L < isortlimit) { isort(L, R, swapspace); @@ -790,11 +837,11 @@ } long t2 = System.currentTimeMillis(); System.out.println("copy c -> d: " + (t2 - t1) + " milliseconds, " + d(testsize, (t2 - t1)) + " entries/millisecond"); - processors = 1; + serverProcessor.useCPU = 1; c.sort(); long t3 = System.currentTimeMillis(); System.out.println("sort c (1) : " + (t3 - t2) + " milliseconds, " + d(testsize, (t3 - t2)) + " entries/millisecond"); - processors = 2; + serverProcessor.useCPU = 2; d.sort(); long t4 = System.currentTimeMillis(); System.out.println("sort d (2) : " + (t4 - t3) + " milliseconds, " + d(testsize, (t4 - t3)) + " entries/millisecond"); Modified: trunk/source/de/anomic/kelondro/kelondroSortStack.java =================================================================== --- trunk/source/de/anomic/kelondro/kelondroSortStack.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/kelondro/kelondroSortStack.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -53,7 +53,7 @@ return this.onstack.size(); } - public synchronized void push(stackElement se) { + public void push(stackElement se) { push(se.element, se.weight); } Modified: trunk/source/de/anomic/plasma/plasmaSearchAPI.java =================================================================== --- trunk/source/de/anomic/plasma/plasmaSearchAPI.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/plasma/plasmaSearchAPI.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -90,7 +90,7 @@ public static plasmaSearchRankingProcess genSearchresult(serverObjects prop, plasmaSwitchboard sb, String keyhash, kelondroBitfield filter, int sortorder) { plasmaSearchQuery query = new plasmaSearchQuery(keyhash, -1, sb.getRanking(), filter); - plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE); + plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE, 1); ranked.execQuery(); if (ranked.filteredCount() == 0) { Modified: trunk/source/de/anomic/plasma/plasmaSearchEvent.java =================================================================== --- trunk/source/de/anomic/plasma/plasmaSearchEvent.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/plasma/plasmaSearchEvent.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -123,7 +123,7 @@ if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) || (query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) { // do a global search - this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation); + this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 16); int fetchpeers = 30; @@ -156,7 +156,7 @@ serverLog.logFine("SEARCH_EVENT", "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + primarySearchThreads.length + " PEERS: " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); } else { // do a local search - this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation); + this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 2); this.rankedCache.execQuery(); //plasmaWordIndex.Finding finding = wordIndex.retrieveURLs(query, false, 2, ranking, process); Modified: trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java =================================================================== --- trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import de.anomic.htmlFilter.htmlFilterContentScraper; import de.anomic.index.indexContainer; @@ -62,14 +63,14 @@ private int maxentries; private int remote_peerCount, remote_indexCount, remote_resourceSize, local_resourceSize; private indexRWIEntryOrder order; - private HashMap<String, Integer> urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion) + private ConcurrentHashMap<String, Integer> urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion) private kelondroMScoreCluster<String> ref; // reference score computation for the commonSense heuristic private int[] flagcount; // flag counter private TreeSet<String> misses; // contains url-hashes that could not been found in the LURL-DB private plasmaWordIndex wordIndex; - private Map<String, indexContainer>[] localSearchContainerMaps; + private HashMap<String, indexContainer>[] localSearchContainerMaps; - public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries) { + public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries, int concurrency) { // we collect the urlhashes and construct a list with urlEntry objects // attention: if minEntries is too high, this method will not terminate within the maxTime // sortorder: 0 = hash, 1 = url, 2 = ranking @@ -84,7 +85,7 @@ this.remote_indexCount = 0; this.remote_resourceSize = 0; this.local_resourceSize = 0; - this.urlhashes = new HashMap<String, Integer>(); + this.urlhashes = new ConcurrentHashMap<String, Integer>(0, 0.75f, concurrency); this.ref = new kelondroMScoreCluster<String>(); this.misses = new TreeSet<String>(); this.wordIndex = wordIndex; @@ -262,7 +263,7 @@ return false; } - public synchronized Map<String, indexContainer>[] searchContainerMaps() { + public Map<String, indexContainer>[] searchContainerMaps() { // direct access to the result maps is needed for abstract generation // this is only available if execQuery() was called before return localSearchContainerMaps; Modified: trunk/source/de/anomic/plasma/plasmaWordIndex.java =================================================================== --- trunk/source/de/anomic/plasma/plasmaWordIndex.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/plasma/plasmaWordIndex.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -385,11 +385,11 @@ return container; } - public Map<String, indexContainer> getContainers(Set<String> wordHashes, Set<String> urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) { + public HashMap<String, indexContainer> getContainers(Set<String> wordHashes, Set<String> urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) { // return map of wordhash:indexContainer // retrieve entities that belong to the hashes - HashMap<String, indexContainer> containers = new HashMap<String, indexContainer>(); + HashMap<String, indexContainer> containers = new HashMap<String, indexContainer>(wordHashes.size()); String singleHash; indexContainer singleContainer; Iterator<String> i = wordHashes.iterator(); @@ -402,7 +402,7 @@ singleContainer = getContainer(singleHash, urlselection); // check result - if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>(); + if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>(0); containers.put(singleHash, singleContainer); } @@ -410,22 +410,22 @@ } @SuppressWarnings("unchecked") - public Map<String, indexContainer>[] localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) { + public HashMap<String, indexContainer>[] localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) { // search for the set of hashes and return a map of of wordhash:indexContainer containing the seach result // retrieve entities that belong to the hashes - Map<String, indexContainer> inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>() : getContainers( + HashMap<String, indexContainer> inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>(0) : getContainers( query.queryHashes, urlselection, true, true); - if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap<String, indexContainer>(); // prevent that only a subset is returned - Map<String, indexContainer> exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>() : getContainers( + if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap<String, indexContainer>(0); // prevent that only a subset is returned + HashMap<String, indexContainer> exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>(0) : getContainers( query.excludeHashes, urlselection, true, true); - return new Map[]{inclusionContainers, exclusionContainers}; + return new HashMap[]{inclusionContainers, exclusionContainers}; } public int size() { Modified: trunk/source/de/anomic/server/serverDomains.java =================================================================== --- trunk/source/de/anomic/server/serverDomains.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/server/serverDomains.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -28,13 +28,13 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import de.anomic.kelondro.kelondroMScoreCluster; import de.anomic.plasma.plasmaSwitchboard; @@ -42,7 +42,7 @@ public class serverDomains { // a dns cache - private static final Map<String, InetAddress> nameCacheHit = Collections.synchronizedMap(new HashMap<String, InetAddress>()); // a not-synchronized map resulted in deadlocks + private static final Map<String, InetAddress> nameCacheHit = new ConcurrentHashMap<String, InetAddress>(); // a not-synchronized map resulted in deadlocks private static final Set<String> nameCacheMiss = Collections.synchronizedSet(new HashSet<String>()); private static final kelondroMScoreCluster<String> nameCacheHitAges = new kelondroMScoreCluster<String>(); private static final kelondroMScoreCluster<String> nameCacheMissAges = new kelondroMScoreCluster<String>(); Added: trunk/source/de/anomic/server/serverProcessor.java =================================================================== --- trunk/source/de/anomic/server/serverProcessor.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/server/serverProcessor.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -0,0 +1,33 @@ +// serverProcessor.java +// (C) 2008 by Michael Peter Christen; [EMAIL PROTECTED], Frankfurt a. M., Germany +// first published 27.02.2008 on http://yacy.net +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.server; + + +public class serverProcessor { + + public static final int availableCPU = Runtime.getRuntime().availableProcessors(); + public static int useCPU = availableCPU; + +} Modified: trunk/source/de/anomic/server/serverProfiling.java =================================================================== --- trunk/source/de/anomic/server/serverProfiling.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/de/anomic/server/serverProfiling.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -26,11 +26,10 @@ package de.anomic.server; -import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; public class serverProfiling extends Thread { @@ -41,8 +40,8 @@ static { // initialize profiling - historyMaps = Collections.synchronizedMap(new HashMap<String, TreeMap<Long, Event>>()); - eventCounter = Collections.synchronizedMap(new HashMap<String, Integer>()); + historyMaps = new ConcurrentHashMap<String, TreeMap<Long, Event>>(); + eventCounter = new ConcurrentHashMap<String, Integer>(); lastCompleteCleanup = System.currentTimeMillis(); systemProfiler = null; } Modified: trunk/source/yacy.java =================================================================== --- trunk/source/yacy.java 2008-02-26 23:00:20 UTC (rev 4516) +++ trunk/source/yacy.java 2008-02-27 15:16:47 UTC (rev 4517) @@ -75,6 +75,7 @@ import de.anomic.kelondro.kelondroDyn; import de.anomic.kelondro.kelondroMScoreCluster; import de.anomic.kelondro.kelondroMapObjects; +import de.anomic.kelondro.kelondroRowCollection; import de.anomic.plasma.plasmaCondenser; import de.anomic.plasma.plasmaCrawlLURL; import de.anomic.plasma.plasmaSwitchboard; @@ -408,6 +409,7 @@ serverLog.logSevere("MAIN CONTROL LOOP", "PANIC: " + e.getMessage(),e); } // shut down + if (kelondroRowCollection.sortingthread != null) kelondroRowCollection.sortingthread.terminate(); serverLog.logConfig("SHUTDOWN", "caught termination signal"); server.terminate(false); server.interrupt(); _______________________________________________ YaCy-svn mailing list YaCy-svn@lists.berlios.de https://lists.berlios.de/mailman/listinfo/yacy-svn