Title: [846] trunk/core/src/test/java/org/servicemix/examples: Fix SedaQueue to use the work manager instead of spawning its own thread
- Revision
- 846
- Author
- gnt
- Date
- 2005-11-15 14:31:08 -0500 (Tue, 15 Nov 2005)
Log Message
Fix SedaQueue to use the work manager instead of spawning its own thread
Modified Paths
Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/seda/SedaQueue.java (845 => 846)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/seda/SedaQueue.java 2005-11-15 17:22:45 UTC (rev 845)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/seda/SedaQueue.java 2005-11-15 19:31:08 UTC (rev 846)
@@ -49,8 +49,8 @@
protected AtomicBoolean started = new AtomicBoolean(false);
protected AtomicBoolean running = new AtomicBoolean(false);
protected ObjectName objectName;
- protected Thread thread;
protected String subType;
+ protected Thread thread;
/**
* SedaQueue name
@@ -138,17 +138,16 @@
* @throws JBIException
*/
public void start() throws JBIException {
- started.set(true);
- running.set(true);
- synchronized (started) {
- started.notify();
+ synchronized (running) {
+ try {
+ started.set(true);
+ flow.getBroker().getWorkManager().startWork(this);
+ running.wait();
+ super.start();
+ } catch (Exception e) {
+ throw new JBIException("Unable to start queue work", e);
+ }
}
- if (thread == null) {
- thread = new Thread(this);
- thread.setDaemon(true);
- thread.start();
- }
- super.start();
}
/**
@@ -158,6 +157,18 @@
*/
public void stop() throws JBIException {
started.set(false);
+ if (thread != null && running.get()) {
+ try {
+ synchronized (running) {
+ thread.interrupt();
+ running.wait();
+ }
+ } catch (Exception e) {
+ log.warn("Error stopping thread", e);
+ } finally {
+ thread = null;
+ }
+ }
super.stop();
}
@@ -167,15 +178,6 @@
* @throws JBIException
*/
public void shutDown() throws JBIException {
- running.set(false);
- if (thread != null) {
- try {
- thread.interrupt();
- thread.join();
- } catch (Exception e) {
- log.warn("Error stopping thread", e);
- }
- }
super.shutDown();
}
@@ -194,24 +196,17 @@
}
/**
- * @return true if running
- */
- public boolean isRunning(){
- return running.get();
- }
-
- /**
* do processing
*/
public void run() {
- while (running.get()) {
+ thread = Thread.currentThread();
+ synchronized (running) {
+ running.set(true);
+ running.notify();
+ }
+ while (started.get()) {
final MessageExchangeImpl me;
try {
- synchronized (started) {
- while (running.get() && !started.get()) {
- started.wait(500);
- }
- }
me = (MessageExchangeImpl) queue.poll(1000);
if (me != null) {
flow.getBroker().getWorkManager().scheduleWork(new Work() {
@@ -233,14 +228,18 @@
}
}
catch (InterruptedException e) {
- if (!running.get()) {
- return;
+ if (!started.get()) {
+ break;
}
log.warn(this + " interrupted", e);
} catch (WorkException e) {
log.error(this + " got error processing exchange", e);
}
}
+ synchronized (running) {
+ running.set(false);
+ running.notify();
+ }
}
/**
Modified: trunk/core/src/test/java/org/servicemix/examples/ExamplePojoTest.java (845 => 846)
--- trunk/core/src/test/java/org/servicemix/examples/ExamplePojoTest.java 2005-11-15 17:22:45 UTC (rev 845)
+++ trunk/core/src/test/java/org/servicemix/examples/ExamplePojoTest.java 2005-11-15 19:31:08 UTC (rev 846)
@@ -49,4 +49,8 @@
container.activateComponent(new ActivationSpec("sender", sender));
container.activateComponent(new ActivationSpec("receiver", receiver));
}
+
+ protected void tearDown() throws Exception {
+ container.shutDown();
+ }
}