Title: [846] trunk/core/src/test/java/org/servicemix/examples: Fix SedaQueue to use the work manager instead of spawning its own thread
Revision
846
Author
gnt
Date
2005-11-15 14:31:08 -0500 (Tue, 15 Nov 2005)

Log Message

Fix SedaQueue to use the work manager instead of spawning its own thread

Modified Paths

Diff

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/seda/SedaQueue.java (845 => 846)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/seda/SedaQueue.java	2005-11-15 17:22:45 UTC (rev 845)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/seda/SedaQueue.java	2005-11-15 19:31:08 UTC (rev 846)
@@ -49,8 +49,8 @@
     protected AtomicBoolean started = new AtomicBoolean(false);
     protected AtomicBoolean running = new AtomicBoolean(false);
     protected ObjectName objectName;
-    protected Thread thread;
     protected String subType;
+    protected Thread thread;
 
     /**
      * SedaQueue name
@@ -138,17 +138,16 @@
      * @throws JBIException
      */
     public void start() throws JBIException {
-        started.set(true);
-        running.set(true);
-        synchronized (started) {
-            started.notify();
+        synchronized (running) {
+            try {
+                started.set(true);
+                flow.getBroker().getWorkManager().startWork(this);
+                running.wait();
+                super.start();
+            } catch (Exception e) {
+                throw new JBIException("Unable to start queue work", e);
+            }
         }
-        if (thread == null) {
-            thread = new Thread(this);
-            thread.setDaemon(true);
-            thread.start();
-        }
-        super.start();
     }
 
     /**
@@ -158,6 +157,18 @@
      */
     public void stop() throws JBIException {
         started.set(false);
+        if (thread != null && running.get()) {
+            try {
+                synchronized (running) {
+                    thread.interrupt();
+                    running.wait();
+                }
+            } catch (Exception e) {
+                log.warn("Error stopping thread", e);
+            } finally {
+                thread = null;
+            }
+        }
         super.stop();
     }
 
@@ -167,15 +178,6 @@
      * @throws JBIException
      */
     public void shutDown() throws JBIException {
-        running.set(false);
-        if (thread != null) {
-            try {
-                thread.interrupt();
-                thread.join();
-            } catch (Exception e) {
-                log.warn("Error stopping thread", e);
-            }
-        }
         super.shutDown();
     }
 
@@ -194,24 +196,17 @@
     }
     
     /**
-     * @return true if running
-     */
-    public boolean isRunning(){
-        return running.get();
-    }
-
-    /**
      * do processing
      */
     public void run() {
-        while (running.get()) {
+        thread = Thread.currentThread();
+        synchronized (running) { 
+            running.set(true);
+            running.notify();
+        }
+        while (started.get()) {
             final MessageExchangeImpl me;
             try {
-                synchronized (started) {
-                    while (running.get() && !started.get()) {
-                        started.wait(500);
-                    }
-                }
                 me = (MessageExchangeImpl) queue.poll(1000);
                 if (me != null) {
                     flow.getBroker().getWorkManager().scheduleWork(new Work() {
@@ -233,14 +228,18 @@
                 }
             }
             catch (InterruptedException e) {
-                if (!running.get()) {
-                    return;
+                if (!started.get()) {
+                    break;
                 }
                 log.warn(this + " interrupted", e);
             } catch (WorkException e) {
                 log.error(this + " got error processing exchange", e);
             }
         }
+        synchronized (running) { 
+            running.set(false);
+            running.notify();
+        }
     }
 
     /**

Modified: trunk/core/src/test/java/org/servicemix/examples/ExamplePojoTest.java (845 => 846)

--- trunk/core/src/test/java/org/servicemix/examples/ExamplePojoTest.java	2005-11-15 17:22:45 UTC (rev 845)
+++ trunk/core/src/test/java/org/servicemix/examples/ExamplePojoTest.java	2005-11-15 19:31:08 UTC (rev 846)
@@ -49,4 +49,8 @@
         container.activateComponent(new ActivationSpec("sender", sender));
         container.activateComponent(new ActivationSpec("receiver", receiver));
     }
+    
+    protected void tearDown() throws Exception {
+        container.shutDown();
+    }
 }

Reply via email to