From: Scott Crosby <[email protected]>
---
src/uk/me/parabola/splitter/SplitProcessor.java | 132 ++++++++++-------------
1 files changed, 56 insertions(+), 76 deletions(-)
diff --git a/src/uk/me/parabola/splitter/SplitProcessor.java
b/src/uk/me/parabola/splitter/SplitProcessor.java
index 4f7220e..71957d9 100644
--- a/src/uk/me/parabola/splitter/SplitProcessor.java
+++ b/src/uk/me/parabola/splitter/SplitProcessor.java
@@ -18,9 +18,7 @@ import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.TreeMap;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -30,15 +28,18 @@ import uk.me.parabola.splitter.Relation.Member;
* Splits a map into multiple areas.
*/
class SplitProcessor implements MapProcessor {
-
+ public static final int NO_ELEMENTS = 10;
+ public static final int BUNDLE_SIZE = 2000;
+
+
private final SplitIntMap coords = new SplitIntMap();
private final SplitIntMap ways = new SplitIntMap();
private final IntObjMap<long[]> bigWays = new IntObjMap<long[]>();
private final OSMWriter[] writers;
- private final BlockingQueue<Element>[] writerInputQueues;
- private final BlockingQueue<InputQueueInfo> writerInputQueue;
- private final ArrayList<Thread> workerThreads;
+ private final BlockingQueue<List<Element>>[] writerInputQueues;
+ private final List<Element>[] bundlingQueues;
+ Thread threads[];
private int currentNodeAreaSet;
private ArrayList<Integer> currentWayAreaSet, tmpWayAreaSet;
@@ -86,24 +87,18 @@ class SplitProcessor implements MapProcessor {
this.writers = writers;
makeWriterMap();
this.maxThreads = maxThreads;
- this.writerInputQueue = new
ArrayBlockingQueue<InputQueueInfo>(writers.length);
this.writerInputQueues = new BlockingQueue[writers.length];
+ this.bundlingQueues = new ArrayList[writers.length];
+ this.threads = new Thread[writers.length];
for (int i = 0; i < writerInputQueues.length;i++) {
- writerInputQueues[i] = new
ArrayBlockingQueue<Element>(NO_ELEMENTS);
- writerInputQueue.add(new
InputQueueInfo(this.writers[i], writerInputQueues[i]));
+ writerInputQueues[i] = new
ArrayBlockingQueue<List<Element>>(NO_ELEMENTS);
+ bundlingQueues[i] = new ArrayList<Element>(BUNDLE_SIZE);
+ threads[i] = new Thread(new
OSMWriterWorker(writers[i],writerInputQueues[i]));
+ threads[i].start();
}
tmpWayAreaSet = new ArrayList<Integer>(10);
currentWayAreaSet = new ArrayList<Integer>(10);
currentRelAreaSet = new BitSet(writers.length);
-
- int noOfWorkerThreads = this.maxThreads - 1;
- workerThreads = new ArrayList<Thread>(noOfWorkerThreads);
- for (int i = 0; i < noOfWorkerThreads; i++) {
- Thread worker = new Thread(new OSMWriterWorker());
- worker.setName("worker-" + i);
- workerThreads.add(worker);
- worker.start();
- }
}
@Override
@@ -220,20 +215,18 @@ class SplitProcessor implements MapProcessor {
@Override
public void endMap() {
- for (int i = 0; i < writerInputQueues.length; i++) {
- try {
- writerInputQueues[i].put(STOP_ELEMENT);
- } catch (InterruptedException e) {
- throw new RuntimeException("Failed to add the
stop element for worker thread " + i, e);
- }
- }
- for (Thread workerThread : workerThreads) {
- try {
- workerThread.join();
+ try {
+ // Push the stop element into every queue.
+ for (int i = 0 ; i < threads.length ; i++ )
+ addToWorkingQueue(i,STOP_ELEMENT);
+ // Wait for them to all exit.
+ for (int i = 0 ; i < threads.length ; i++ )
+ threads[i].join();
} catch (InterruptedException e) {
- throw new RuntimeException("Failed to join for
thread " + workerThread.getName(), e);
+ // TODO Auto-generated catch block
+ e.printStackTrace();
}
- }
+
for (OSMWriter writer : writers) {
writer.finishWrite();
}
@@ -350,29 +343,34 @@ class SplitProcessor implements MapProcessor {
}
private void addToWorkingQueue(int writerNumber, Element element) {
+ List<Element> bundle=bundlingQueues[writerNumber];
+ bundle.add(element);
+ if (bundle.size() < BUNDLE_SIZE && element != STOP_ELEMENT)
+ return;
try {
- writerInputQueues[writerNumber].put(element);
+ BlockingQueue<List<Element>> queue =
writerInputQueues[writerNumber];
+ queue.put(bundle);
+ bundlingQueues[writerNumber] = new
ArrayList<Element>(BUNDLE_SIZE);
} catch (InterruptedException e) {
throw new RuntimeException("Failed to write node " +
element.getId() + " to worker thread " + writerNumber, e);
}
}
- private static class InputQueueInfo {
- private final OSMWriter writer;
- private final BlockingQueue<Element> inputQueue;
-
- public InputQueueInfo(OSMWriter writer, BlockingQueue<Element>
inputQueue) {
- this.writer = writer;
- this.inputQueue = inputQueue;
- }
- }
private static final Element STOP_ELEMENT = new Element();
- public static final int NO_ELEMENTS = 1000;
private class OSMWriterWorker implements Runnable {
+
+ private OSMWriter writer;
+ private BlockingQueue<List<Element>> queue;
+
+ public OSMWriterWorker(OSMWriter writer,
BlockingQueue<List<Element>> queue) {
+ this.writer = writer;
+ this.queue = queue;
+ }
+
public void processElement(Element element, OSMWriter writer)
throws IOException {
if (element instanceof Node) {
writer.write((Node) element);
@@ -385,41 +383,23 @@ class SplitProcessor implements MapProcessor {
@Override
public void run() {
- boolean finished = false;
- while (!finished) {
- InputQueueInfo workPackage =
writerInputQueue.poll();
- if (workPackage==null) {
- finished=true;
- } else {
- while
(!workPackage.inputQueue.isEmpty()) {
- Element element =null;
- try {
- element =
workPackage.inputQueue.poll();
- if (element == null) {
-
writerInputQueue.put(workPackage);
-
workPackage=null;
- break;
- } else if (element ==
STOP_ELEMENT) {
-
workPackage=null;
- break;
- } else {
-
processElement(element, workPackage.writer);
- }
-
- } catch (InterruptedException
e) {
- throw new
RuntimeException("Thread " + Thread.currentThread().getName() + " failed to get
next element", e);
- } catch (IOException e) {
- throw new
RuntimeException("Thread " + Thread.currentThread().getName() + " failed to
write element " + element.getId() + '(' + element.getClass().getSimpleName() +
')', e);
- }
- }
- if (workPackage != null) {
- try {
-
writerInputQueue.put(workPackage);
- } catch (InterruptedException
e) {
- throw new
RuntimeException("Thread " + Thread.currentThread().getName() + " failed to
return work package", e);
- }
- }
+ while (true) {
+ //System.out.println("Doing loop");
+ try {
+
+ List<Element> elements = queue.take();
+ for (Element element : elements)
+ if (element == STOP_ELEMENT)
+ return;
+ else
+ processElement(element,
writer);
+
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Thread " +
Thread.currentThread().getName() + " failed to get next element", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Thread " +
Thread.currentThread().getName() + " failed to write element ",e);
}
+ Thread.yield();
}
System.out.println("Thread " +
Thread.currentThread().getName() + " has finished");
}
--
1.7.2.3
_______________________________________________
mkgmap-dev mailing list
[email protected]
http://www.mkgmap.org.uk/mailman/listinfo/mkgmap-dev