From: Scott Crosby <[email protected]>

---
 src/uk/me/parabola/splitter/SplitProcessor.java |  132 ++++++++++-------------
 1 files changed, 56 insertions(+), 76 deletions(-)

diff --git a/src/uk/me/parabola/splitter/SplitProcessor.java 
b/src/uk/me/parabola/splitter/SplitProcessor.java
index 4f7220e..71957d9 100644
--- a/src/uk/me/parabola/splitter/SplitProcessor.java
+++ b/src/uk/me/parabola/splitter/SplitProcessor.java
@@ -18,9 +18,7 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.TreeMap;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
@@ -30,15 +28,18 @@ import uk.me.parabola.splitter.Relation.Member;
  * Splits a map into multiple areas.
  */
 class SplitProcessor implements MapProcessor {
-
+       public static final int NO_ELEMENTS = 10;
+       public static final int BUNDLE_SIZE = 2000;
+       
+       
        private final SplitIntMap coords = new SplitIntMap();
        private final SplitIntMap ways = new SplitIntMap();
        private final IntObjMap<long[]> bigWays = new IntObjMap<long[]>();
 
        private final OSMWriter[] writers;
-       private final BlockingQueue<Element>[] writerInputQueues;
-       private final BlockingQueue<InputQueueInfo> writerInputQueue;
-       private final ArrayList<Thread> workerThreads;
+       private final BlockingQueue<List<Element>>[] writerInputQueues;
+       private final List<Element>[] bundlingQueues;
+       Thread threads[];
 
        private int currentNodeAreaSet;
        private ArrayList<Integer> currentWayAreaSet, tmpWayAreaSet;
@@ -86,24 +87,18 @@ class SplitProcessor implements MapProcessor {
                this.writers = writers;
                makeWriterMap();
                this.maxThreads = maxThreads;
-               this.writerInputQueue = new 
ArrayBlockingQueue<InputQueueInfo>(writers.length); 
                this.writerInputQueues = new BlockingQueue[writers.length];
+               this.bundlingQueues = new ArrayList[writers.length];
+               this.threads = new Thread[writers.length];
                for (int i = 0; i < writerInputQueues.length;i++) {
-                       writerInputQueues[i] = new 
ArrayBlockingQueue<Element>(NO_ELEMENTS);
-                       writerInputQueue.add(new 
InputQueueInfo(this.writers[i], writerInputQueues[i]));
+                       writerInputQueues[i] = new 
ArrayBlockingQueue<List<Element>>(NO_ELEMENTS);
+                       bundlingQueues[i] = new ArrayList<Element>(BUNDLE_SIZE);
+                       threads[i] = new Thread(new 
OSMWriterWorker(writers[i],writerInputQueues[i]));
+                       threads[i].start();
                }
                tmpWayAreaSet = new ArrayList<Integer>(10);
                currentWayAreaSet = new ArrayList<Integer>(10);
                currentRelAreaSet = new BitSet(writers.length);
-               
-               int noOfWorkerThreads = this.maxThreads - 1;
-               workerThreads = new ArrayList<Thread>(noOfWorkerThreads);
-               for (int i = 0; i < noOfWorkerThreads; i++) {
-                       Thread worker = new Thread(new OSMWriterWorker());
-                       worker.setName("worker-" + i);
-                       workerThreads.add(worker);
-                       worker.start();
-               }
        }
 
        @Override
@@ -220,20 +215,18 @@ class SplitProcessor implements MapProcessor {
 
        @Override
        public void endMap() {
-               for (int i = 0; i < writerInputQueues.length; i++) {
-                       try {
-                               writerInputQueues[i].put(STOP_ELEMENT);
-                       } catch (InterruptedException e) {
-                               throw new RuntimeException("Failed to add the 
stop element for worker thread " + i, e);
-                       }
-               }
-               for (Thread workerThread : workerThreads) {
-                       try {
-                               workerThread.join();
+               try {
+                       // Push the stop element into every queue.
+                       for (int i = 0 ; i < threads.length ; i++ )
+                               addToWorkingQueue(i,STOP_ELEMENT);
+                       // Wait for them to all exit.
+                       for (int i = 0 ; i < threads.length ; i++ )
+                               threads[i].join();
                        } catch (InterruptedException e) {
-                               throw new RuntimeException("Failed to join for 
thread " + workerThread.getName(), e);
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
                        }
-               }
+
                for (OSMWriter writer : writers) {
                        writer.finishWrite();
                }
@@ -350,29 +343,34 @@ class SplitProcessor implements MapProcessor {
        }
 
        private void addToWorkingQueue(int writerNumber, Element element) {
+               List<Element> bundle=bundlingQueues[writerNumber];
+               bundle.add(element);
+               if (bundle.size() < BUNDLE_SIZE && element != STOP_ELEMENT)
+                       return;
                try {
-                       writerInputQueues[writerNumber].put(element);
+                       BlockingQueue<List<Element>> queue = 
writerInputQueues[writerNumber];
+                       queue.put(bundle);
+                       bundlingQueues[writerNumber] = new 
ArrayList<Element>(BUNDLE_SIZE);
                } catch (InterruptedException e) {
                        throw new RuntimeException("Failed to write node " + 
element.getId() + " to worker thread " + writerNumber, e);
                }
        }
 
-       private static class InputQueueInfo {
-               private final OSMWriter writer;
-               private final BlockingQueue<Element> inputQueue;
-
-               public InputQueueInfo(OSMWriter writer, BlockingQueue<Element> 
inputQueue) {
-      this.writer = writer;
-                       this.inputQueue = inputQueue;
-               }
-       }
 
        private static final Element STOP_ELEMENT = new Element();
 
-       public static final int NO_ELEMENTS = 1000;
 
        private class OSMWriterWorker implements Runnable {
 
+               
+               private OSMWriter writer;
+               private BlockingQueue<List<Element>> queue;
+
+               public OSMWriterWorker(OSMWriter writer, 
BlockingQueue<List<Element>> queue) {
+                       this.writer = writer;
+                       this.queue = queue;
+               }
+
                public void processElement(Element element, OSMWriter writer) 
throws IOException {
                        if (element instanceof Node) {
                                writer.write((Node) element);
@@ -385,41 +383,23 @@ class SplitProcessor implements MapProcessor {
 
                @Override
                public void run() {
-                       boolean finished = false;
-                       while (!finished) {
-                               InputQueueInfo workPackage = 
writerInputQueue.poll();
-                               if (workPackage==null) {
-                                       finished=true;
-                               } else {
-                                       while 
(!workPackage.inputQueue.isEmpty()) {
-                                               Element element =null;
-                                               try {
-                                                       element = 
workPackage.inputQueue.poll();
-                                                       if (element == null) {
-                                                               
writerInputQueue.put(workPackage);
-                                                               
workPackage=null;
-                                                               break;
-                                                       } else if (element == 
STOP_ELEMENT) {
-                                                               
workPackage=null;
-                                                               break;
-                                                       } else {
-                                                               
processElement(element, workPackage.writer);
-                                                       }
-                                                       
-                                               } catch (InterruptedException 
e) {
-                                                       throw new 
RuntimeException("Thread " + Thread.currentThread().getName() + " failed to get 
next element", e);
-                                               } catch (IOException e) {
-                                                       throw new 
RuntimeException("Thread " + Thread.currentThread().getName() + " failed to 
write element " + element.getId() + '(' + element.getClass().getSimpleName() + 
')', e);
-                                               }
-                                       }
-                                       if (workPackage != null) {
-                                               try {
-                                                       
writerInputQueue.put(workPackage);
-                                               } catch (InterruptedException 
e) {
-                                                       throw new 
RuntimeException("Thread " + Thread.currentThread().getName() + " failed to 
return work package", e);
-                                               }
-                                       }
+                       while (true) {
+                               //System.out.println("Doing loop");
+                               try {
+
+                                       List<Element> elements = queue.take();
+                                       for (Element element : elements)
+                                               if (element == STOP_ELEMENT)
+                                                       return;
+                                               else
+                                                       processElement(element, 
writer);
+
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException("Thread " + 
Thread.currentThread().getName() + " failed to get next element", e);
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Thread " + 
Thread.currentThread().getName() + " failed to write element ",e);
                                }
+                               Thread.yield();
                        }
                        System.out.println("Thread " + 
Thread.currentThread().getName() + " has finished");
                }
-- 
1.7.2.3

_______________________________________________
mkgmap-dev mailing list
[email protected]
http://www.mkgmap.org.uk/mailman/listinfo/mkgmap-dev

Reply via email to