Author: orbiter
Date: 2008-02-27 16:16:47 +0100 (Wed, 27 Feb 2008)
New Revision: 4517
Added:
trunk/source/de/anomic/server/serverProcessor.java
Modified:
trunk/htroot/Status.java
trunk/htroot/xml/status_p.java
trunk/source/de/anomic/index/indexRWIEntryOrder.java
trunk/source/de/anomic/kelondro/kelondroRowCollection.java
trunk/source/de/anomic/kelondro/kelondroSortStack.java
trunk/source/de/anomic/plasma/plasmaSearchAPI.java
trunk/source/de/anomic/plasma/plasmaSearchEvent.java
trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java
trunk/source/de/anomic/plasma/plasmaWordIndex.java
trunk/source/de/anomic/server/serverDomains.java
trunk/source/de/anomic/server/serverProfiling.java
trunk/source/yacy.java
Log:
more multithreading support:
- replaced some synchronized classes by classes from util.concurrent
- used a util.concurrent.SynchronousQueue to implement a persistent sorting
thread in
the very basic kelondroRowCollection which supports sorting with a second
thread
in case that a double-core processing CPU is used
Modified: trunk/htroot/Status.java
===================================================================
--- trunk/htroot/Status.java 2008-02-26 23:00:20 UTC (rev 4516)
+++ trunk/htroot/Status.java 2008-02-27 15:16:47 UTC (rev 4517)
@@ -58,6 +58,7 @@
import de.anomic.server.serverDomains;
import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects;
+import de.anomic.server.serverProcessor;
import de.anomic.server.serverSwitch;
import de.anomic.tools.yFormatter;
import de.anomic.yacy.yacyCore;
@@ -293,7 +294,7 @@
prop.put("freeMemory", serverMemory.bytesToString(rt.freeMemory()));
prop.put("totalMemory", serverMemory.bytesToString(rt.totalMemory()));
prop.put("maxMemory", serverMemory.bytesToString(rt.maxMemory()));
- prop.put("processors", rt.availableProcessors());
+ prop.put("processors", serverProcessor.availableCPU);
// proxy traffic
//prop.put("trafficIn",bytesToString(httpdByteCountInputStream.getGlobalCount()));
Modified: trunk/htroot/xml/status_p.java
===================================================================
--- trunk/htroot/xml/status_p.java 2008-02-26 23:00:20 UTC (rev 4516)
+++ trunk/htroot/xml/status_p.java 2008-02-27 15:16:47 UTC (rev 4517)
@@ -44,6 +44,7 @@
import de.anomic.http.httpdByteCountOutputStream;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverObjects;
+import de.anomic.server.serverProcessor;
import de.anomic.server.serverSwitch;
import de.anomic.yacy.yacyCore;
@@ -75,7 +76,7 @@
prop.putNum("freeMemory", rt.freeMemory());
prop.putNum("totalMemory", rt.totalMemory());
prop.putNum("maxMemory", rt.maxMemory());
- prop.putNum("processors", rt.availableProcessors());
+ prop.putNum("processors", serverProcessor.availableCPU);
// proxy traffic
prop.put("trafficIn",
httpdByteCountInputStream.getGlobalCount());
Modified: trunk/source/de/anomic/index/indexRWIEntryOrder.java
===================================================================
--- trunk/source/de/anomic/index/indexRWIEntryOrder.java 2008-02-26
23:00:20 UTC (rev 4516)
+++ trunk/source/de/anomic/index/indexRWIEntryOrder.java 2008-02-27
15:16:47 UTC (rev 4517)
@@ -36,6 +36,7 @@
import de.anomic.plasma.plasmaCondenser;
import de.anomic.plasma.plasmaSearchRankingProcess;
import de.anomic.plasma.plasmaSearchRankingProfile;
+import de.anomic.server.serverProcessor;
import de.anomic.yacy.yacyURL;
public class indexRWIEntryOrder {
@@ -44,8 +45,6 @@
private kelondroMScoreCluster<String> doms; // collected for "authority"
heuristic
private int maxdomcount;
- private static final int processors =
Runtime.getRuntime().availableProcessors(); // for multiprocessor support, used
during normalization
-
public indexRWIEntryOrder(plasmaSearchRankingProfile profile) {
this.min = null;
this.max = null;
@@ -60,7 +59,7 @@
ArrayList<indexRWIVarEntry> result = null;
//long s0 = System.currentTimeMillis();
- if ((processors > 1) && (container.size() > 600)) {
+ if ((serverProcessor.useCPU > 1) && (container.size() > 600)) {
// run minmax with two threads
int middle = container.size() / 2;
minmaxfinder mmf0 = new minmaxfinder(container, 0, middle);
Modified: trunk/source/de/anomic/kelondro/kelondroRowCollection.java
===================================================================
--- trunk/source/de/anomic/kelondro/kelondroRowCollection.java 2008-02-26
23:00:20 UTC (rev 4516)
+++ trunk/source/de/anomic/kelondro/kelondroRowCollection.java 2008-02-27
15:16:47 UTC (rev 4517)
@@ -31,9 +31,11 @@
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.SynchronousQueue;
import de.anomic.server.serverFileUtils;
import de.anomic.server.serverMemory;
+import de.anomic.server.serverProcessor;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeedDB;
@@ -41,7 +43,17 @@
public static final double growfactor = 1.4;
private static final int isortlimit = 20;
+ private static final Integer dummy = new Integer(0);
+ public static final qsortthread sortingthread;
+ static {
+ if (serverProcessor.useCPU > 1) {
+ sortingthread = new qsortthread();
+ sortingthread.start();
+ } else {
+ sortingthread = null;
+ }
+ }
protected byte[] chunkcache;
protected int chunkcount;
@@ -57,8 +69,6 @@
private static final int exp_order_bound = 5;
private static final int exp_collection = 6;
- private static int processors = Runtime.getRuntime().availableProcessors();
-
public kelondroRowCollection(kelondroRowCollection rc) {
this.rowdef = rc.rowdef;
this.chunkcache = rc.chunkcache;
@@ -465,12 +475,11 @@
}
byte[] swapspace = new byte[this.rowdef.objectsize];
int p = partition(0, this.chunkcount, this.sortBound, swapspace);
- if ((processors > 1) && (this.chunkcount >= 10000)) {
- // sort this using multi-threading; use one second thread
- qsortthread qs = new qsortthread(0, p, 0);
- qs.start();
+ if ((sortingthread != null) && (p > 50) && (sortingthread.isAlive())) {
+ // sort this using multi-threading
+ sortingthread.process(this, 0, p, 0);
qsort(p, this.chunkcount, 0, swapspace);
- try {qs.join();} catch (InterruptedException e)
{e.printStackTrace();}
+ sortingthread.waitFinish();
} else {
qsort(0, p, 0, swapspace);
qsort(p, this.chunkcount, 0, swapspace);
@@ -479,18 +488,56 @@
//assert this.isSorted();
}
- private class qsortthread extends Thread {
- private int sl, sr, sb;
- public qsortthread(int L, int R, int S) {
- this.sl = L;
- this.sr = R;
- this.sb = S;
+ public static class qsortthread extends Thread {
+ private boolean terminate;
+ private SynchronousQueue<qsortobject> startObject;
+ private SynchronousQueue<Integer> finishObject;
+ public qsortthread() {
+ this.terminate = false;
+ this.startObject = new SynchronousQueue<qsortobject>();
+ this.finishObject = new SynchronousQueue<Integer>();
+ this.setName("kelondroRowCollection SORT THREAD");
}
+ public void process(kelondroRowCollection rc, int L, int R, int S) {
+ assert rc != null;
+ synchronized (startObject) {
+ try {this.startObject.put(new qsortobject(rc, L, R, S));} catch
(InterruptedException e) {}
+ }
+ }
+ public void waitFinish() {
+ try {this.finishObject.take();} catch (InterruptedException e) {}
+ }
+ public void terminate() {
+ this.terminate = true;
+ this.interrupt();
+ }
public void run() {
- qsort(sl, sr, sb, new byte[rowdef.objectsize]);
+ qsortobject so = null;
+ while (!terminate) {
+ try {so = this.startObject.take();} catch (InterruptedException
e) {
+ break;
+ }
+ assert so != null;
+ so.rc.qsort(so.sl, so.sr, so.sb, new
byte[so.rc.rowdef.objectsize]);
+ try {this.finishObject.put(dummy);} catch (InterruptedException
e1) {
+ break;
+ }
+ so = null;
+ }
}
}
+ private static class qsortobject {
+ protected kelondroRowCollection rc;
+ protected int sl, sr, sb;
+ public qsortobject(kelondroRowCollection rc, int L, int R, int S) {
+ this.rc = rc;
+ this.sl = L;
+ this.sr = R;
+ this.sb = S;
+ }
+ }
+
private final void qsort(int L, int R, int S, byte[] swapspace) {
if (R - L < isortlimit) {
isort(L, R, swapspace);
@@ -790,11 +837,11 @@
}
long t2 = System.currentTimeMillis();
System.out.println("copy c -> d: " + (t2 - t1) + " milliseconds, " +
d(testsize, (t2 - t1)) + " entries/millisecond");
- processors = 1;
+ serverProcessor.useCPU = 1;
c.sort();
long t3 = System.currentTimeMillis();
System.out.println("sort c (1) : " + (t3 - t2) + " milliseconds, " +
d(testsize, (t3 - t2)) + " entries/millisecond");
- processors = 2;
+ serverProcessor.useCPU = 2;
d.sort();
long t4 = System.currentTimeMillis();
System.out.println("sort d (2) : " + (t4 - t3) + " milliseconds, " +
d(testsize, (t4 - t3)) + " entries/millisecond");
Modified: trunk/source/de/anomic/kelondro/kelondroSortStack.java
===================================================================
--- trunk/source/de/anomic/kelondro/kelondroSortStack.java 2008-02-26
23:00:20 UTC (rev 4516)
+++ trunk/source/de/anomic/kelondro/kelondroSortStack.java 2008-02-27
15:16:47 UTC (rev 4517)
@@ -53,7 +53,7 @@
return this.onstack.size();
}
- public synchronized void push(stackElement se) {
+ public void push(stackElement se) {
push(se.element, se.weight);
}
Modified: trunk/source/de/anomic/plasma/plasmaSearchAPI.java
===================================================================
--- trunk/source/de/anomic/plasma/plasmaSearchAPI.java 2008-02-26 23:00:20 UTC
(rev 4516)
+++ trunk/source/de/anomic/plasma/plasmaSearchAPI.java 2008-02-27 15:16:47 UTC
(rev 4517)
@@ -90,7 +90,7 @@
public static plasmaSearchRankingProcess genSearchresult(serverObjects
prop, plasmaSwitchboard sb, String keyhash, kelondroBitfield filter, int
sortorder) {
plasmaSearchQuery query = new plasmaSearchQuery(keyhash, -1,
sb.getRanking(), filter);
- plasmaSearchRankingProcess ranked = new
plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE);
+ plasmaSearchRankingProcess ranked = new
plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE,
1);
ranked.execQuery();
if (ranked.filteredCount() == 0) {
Modified: trunk/source/de/anomic/plasma/plasmaSearchEvent.java
===================================================================
--- trunk/source/de/anomic/plasma/plasmaSearchEvent.java 2008-02-26
23:00:20 UTC (rev 4516)
+++ trunk/source/de/anomic/plasma/plasmaSearchEvent.java 2008-02-27
15:16:47 UTC (rev 4517)
@@ -123,7 +123,7 @@
if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) ||
(query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) {
// do a global search
- this.rankedCache = new plasmaSearchRankingProcess(wordIndex,
query, 2, max_results_preparation);
+ this.rankedCache = new plasmaSearchRankingProcess(wordIndex,
query, 2, max_results_preparation, 16);
int fetchpeers = 30;
@@ -156,7 +156,7 @@
serverLog.logFine("SEARCH_EVENT", "SEARCH TIME AFTER
GLOBAL-TRIGGER TO " + primarySearchThreads.length + " PEERS: " +
((System.currentTimeMillis() - start) / 1000) + " seconds");
} else {
// do a local search
- this.rankedCache = new plasmaSearchRankingProcess(wordIndex,
query, 2, max_results_preparation);
+ this.rankedCache = new plasmaSearchRankingProcess(wordIndex,
query, 2, max_results_preparation, 2);
this.rankedCache.execQuery();
//plasmaWordIndex.Finding finding = wordIndex.retrieveURLs(query,
false, 2, ranking, process);
Modified: trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java
===================================================================
--- trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java
2008-02-26 23:00:20 UTC (rev 4516)
+++ trunk/source/de/anomic/plasma/plasmaSearchRankingProcess.java
2008-02-27 15:16:47 UTC (rev 4517)
@@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import de.anomic.htmlFilter.htmlFilterContentScraper;
import de.anomic.index.indexContainer;
@@ -62,14 +63,14 @@
private int maxentries;
private int remote_peerCount, remote_indexCount, remote_resourceSize,
local_resourceSize;
private indexRWIEntryOrder order;
- private HashMap<String, Integer> urlhashes; // map for double-check;
String/Long relation, addresses ranking number (backreference for deletion)
+ private ConcurrentHashMap<String, Integer> urlhashes; // map for
double-check; String/Long relation, addresses ranking number (backreference for
deletion)
private kelondroMScoreCluster<String> ref; // reference score computation
for the commonSense heuristic
private int[] flagcount; // flag counter
private TreeSet<String> misses; // contains url-hashes that could not been
found in the LURL-DB
private plasmaWordIndex wordIndex;
- private Map<String, indexContainer>[] localSearchContainerMaps;
+ private HashMap<String, indexContainer>[] localSearchContainerMaps;
- public plasmaSearchRankingProcess(plasmaWordIndex wordIndex,
plasmaSearchQuery query, int sortorder, int maxentries) {
+ public plasmaSearchRankingProcess(plasmaWordIndex wordIndex,
plasmaSearchQuery query, int sortorder, int maxentries, int concurrency) {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not
terminate within the maxTime
// sortorder: 0 = hash, 1 = url, 2 = ranking
@@ -84,7 +85,7 @@
this.remote_indexCount = 0;
this.remote_resourceSize = 0;
this.local_resourceSize = 0;
- this.urlhashes = new HashMap<String, Integer>();
+ this.urlhashes = new ConcurrentHashMap<String, Integer>(0, 0.75f,
concurrency);
this.ref = new kelondroMScoreCluster<String>();
this.misses = new TreeSet<String>();
this.wordIndex = wordIndex;
@@ -262,7 +263,7 @@
return false;
}
- public synchronized Map<String, indexContainer>[] searchContainerMaps() {
+ public Map<String, indexContainer>[] searchContainerMaps() {
// direct access to the result maps is needed for abstract generation
// this is only available if execQuery() was called before
return localSearchContainerMaps;
Modified: trunk/source/de/anomic/plasma/plasmaWordIndex.java
===================================================================
--- trunk/source/de/anomic/plasma/plasmaWordIndex.java 2008-02-26 23:00:20 UTC
(rev 4516)
+++ trunk/source/de/anomic/plasma/plasmaWordIndex.java 2008-02-27 15:16:47 UTC
(rev 4517)
@@ -385,11 +385,11 @@
return container;
}
- public Map<String, indexContainer> getContainers(Set<String> wordHashes,
Set<String> urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) {
+ public HashMap<String, indexContainer> getContainers(Set<String>
wordHashes, Set<String> urlselection, boolean deleteIfEmpty, boolean
interruptIfEmpty) {
// return map of wordhash:indexContainer
// retrieve entities that belong to the hashes
- HashMap<String, indexContainer> containers = new HashMap<String,
indexContainer>();
+ HashMap<String, indexContainer> containers = new HashMap<String,
indexContainer>(wordHashes.size());
String singleHash;
indexContainer singleContainer;
Iterator<String> i = wordHashes.iterator();
@@ -402,7 +402,7 @@
singleContainer = getContainer(singleHash, urlselection);
// check result
- if (((singleContainer == null) || (singleContainer.size() ==
0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>();
+ if (((singleContainer == null) || (singleContainer.size() ==
0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>(0);
containers.put(singleHash, singleContainer);
}
@@ -410,22 +410,22 @@
}
@SuppressWarnings("unchecked")
- public Map<String, indexContainer>[]
localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) {
+ public HashMap<String, indexContainer>[]
localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) {
// search for the set of hashes and return a map of of
wordhash:indexContainer containing the seach result
// retrieve entities that belong to the hashes
- Map<String, indexContainer> inclusionContainers =
(query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>() :
getContainers(
+ HashMap<String, indexContainer> inclusionContainers =
(query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>(0) :
getContainers(
query.queryHashes,
urlselection,
true,
true);
- if ((inclusionContainers.size() != 0) && (inclusionContainers.size() <
query.queryHashes.size())) inclusionContainers = new HashMap<String,
indexContainer>(); // prevent that only a subset is returned
- Map<String, indexContainer> exclusionContainers =
(inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>() :
getContainers(
+ if ((inclusionContainers.size() != 0) && (inclusionContainers.size() <
query.queryHashes.size())) inclusionContainers = new HashMap<String,
indexContainer>(0); // prevent that only a subset is returned
+ HashMap<String, indexContainer> exclusionContainers =
(inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>(0) :
getContainers(
query.excludeHashes,
urlselection,
true,
true);
- return new Map[]{inclusionContainers, exclusionContainers};
+ return new HashMap[]{inclusionContainers, exclusionContainers};
}
public int size() {
Modified: trunk/source/de/anomic/server/serverDomains.java
===================================================================
--- trunk/source/de/anomic/server/serverDomains.java 2008-02-26 23:00:20 UTC
(rev 4516)
+++ trunk/source/de/anomic/server/serverDomains.java 2008-02-27 15:16:47 UTC
(rev 4517)
@@ -28,13 +28,13 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.plasma.plasmaSwitchboard;
@@ -42,7 +42,7 @@
public class serverDomains {
// a dns cache
- private static final Map<String, InetAddress> nameCacheHit =
Collections.synchronizedMap(new HashMap<String, InetAddress>()); // a
not-synchronized map resulted in deadlocks
+ private static final Map<String, InetAddress> nameCacheHit = new
ConcurrentHashMap<String, InetAddress>(); // a not-synchronized map resulted in
deadlocks
private static final Set<String> nameCacheMiss =
Collections.synchronizedSet(new HashSet<String>());
private static final kelondroMScoreCluster<String> nameCacheHitAges = new
kelondroMScoreCluster<String>();
private static final kelondroMScoreCluster<String> nameCacheMissAges = new
kelondroMScoreCluster<String>();
Added: trunk/source/de/anomic/server/serverProcessor.java
===================================================================
--- trunk/source/de/anomic/server/serverProcessor.java 2008-02-26 23:00:20 UTC
(rev 4516)
+++ trunk/source/de/anomic/server/serverProcessor.java 2008-02-27 15:16:47 UTC
(rev 4517)
@@ -0,0 +1,33 @@
+// serverProcessor.java
+// (C) 2008 by Michael Peter Christen; [EMAIL PROTECTED], Frankfurt a. M.,
Germany
+// first published 27.02.2008 on http://yacy.net
+//
+// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
+// $LastChangedRevision: 1986 $
+// $LastChangedBy: orbiter $
+//
+// LICENSE
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+package de.anomic.server;
+
+
+public class serverProcessor {
+
+ public static final int availableCPU =
Runtime.getRuntime().availableProcessors();
+ public static int useCPU = availableCPU;
+
+}
Modified: trunk/source/de/anomic/server/serverProfiling.java
===================================================================
--- trunk/source/de/anomic/server/serverProfiling.java 2008-02-26 23:00:20 UTC
(rev 4516)
+++ trunk/source/de/anomic/server/serverProfiling.java 2008-02-27 15:16:47 UTC
(rev 4517)
@@ -26,11 +26,10 @@
package de.anomic.server;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
public class serverProfiling extends Thread {
@@ -41,8 +40,8 @@
static {
// initialize profiling
- historyMaps = Collections.synchronizedMap(new HashMap<String,
TreeMap<Long, Event>>());
- eventCounter = Collections.synchronizedMap(new HashMap<String,
Integer>());
+ historyMaps = new ConcurrentHashMap<String, TreeMap<Long, Event>>();
+ eventCounter = new ConcurrentHashMap<String, Integer>();
lastCompleteCleanup = System.currentTimeMillis();
systemProfiler = null;
}
Modified: trunk/source/yacy.java
===================================================================
--- trunk/source/yacy.java 2008-02-26 23:00:20 UTC (rev 4516)
+++ trunk/source/yacy.java 2008-02-27 15:16:47 UTC (rev 4517)
@@ -75,6 +75,7 @@
import de.anomic.kelondro.kelondroDyn;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroMapObjects;
+import de.anomic.kelondro.kelondroRowCollection;
import de.anomic.plasma.plasmaCondenser;
import de.anomic.plasma.plasmaCrawlLURL;
import de.anomic.plasma.plasmaSwitchboard;
@@ -408,6 +409,7 @@
serverLog.logSevere("MAIN CONTROL LOOP", "PANIC: " +
e.getMessage(),e);
}
// shut down
+ if (kelondroRowCollection.sortingthread != null)
kelondroRowCollection.sortingthread.terminate();
serverLog.logConfig("SHUTDOWN", "caught termination
signal");
server.terminate(false);
server.interrupt();
_______________________________________________
YaCy-svn mailing list
[email protected]
https://lists.berlios.de/mailman/listinfo/yacy-svn