Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java (748 => 749)
--- trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -134,6 +134,7 @@
if (lcc.isPojo()){
lcc.getComponent().getLifeCycle().shutDown();
}
+ lcc.getDeliveryChannel().close();
}
super.shutDown();
container.getManagementContext().unregisterMBean(this);
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (748 => 749)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -18,6 +18,7 @@
**/
package org.servicemix.jbi.messaging;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.activemq.util.IdGenerator;
@@ -45,6 +46,10 @@
import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
/**
* DeliveryChannel implementation
*
@@ -53,6 +58,7 @@
public class DeliveryChannelImpl implements DeliveryChannel {
private static final Log log = LogFactory.getLog(DeliveryChannel.class);
+
private JBIContainer container;
private ComponentContextImpl context;
private LocalComponentConnector componentConnector;
@@ -67,6 +73,7 @@
private long lastSendTime = System.currentTimeMillis();
private long lastReceiveTime = System.currentTimeMillis();
private AtomicBoolean closed = new AtomicBoolean(false);
+ private Map waiters = new ConcurrentHashMap();
/**
* Constructor
@@ -109,6 +116,23 @@
*/
public void close() throws MessagingException {
if (this.closed.compareAndSet(false, true)) {
+ List pending = queue.closeAndFlush();
+ for (Iterator iter = pending.iterator(); iter.hasNext();) {
+ MessageExchangeImpl messageExchange = (MessageExchangeImpl) iter.next();
+ if (messageExchange.getTransactionContext() != null && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+ synchronized (messageExchange.getMirror()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Notifying: " + messageExchange.getExchangeId());
+ }
+ messageExchange.getMirror().notify();
+ }
+ }
+ }
+ // Interrupt all blocked thread
+ Object[] threads = waiters.keySet().toArray();
+ for (int i = 0; i < threads.length; i++) {
+ ((Thread) threads[i]).interrupt();
+ }
// TODO: deactivate all endpoints from this component
// TODO: Cause all accepts to return null
// TODO: Abort all pending exchanges
@@ -227,10 +251,16 @@
try {
checkNotClosed();
MessageExchangeImpl me = (MessageExchangeImpl) queue.take();
+ if (log.isDebugEnabled()) {
+ log.debug("Accepting " + me.getExchangeId() + " in " + this);
+ }
resumeTx(me);
me.handleAccept();
return me;
}
+ catch (IllegalStateException e) {
+ throw new MessagingException("DeliveryChannel has been closed.");
+ }
catch (InterruptedException e) {
throw new MessagingException("accept failed", e);
}
@@ -251,8 +281,14 @@
// If the exchange has already timed out,
// do not give it to the component
if (me.getPacket().isAborted()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Aborted " + me.getExchangeId() + " in " + this);
+ }
me = null;
} else {
+ if (log.isDebugEnabled()) {
+ log.debug("Accepting " + me.getExchangeId() + " in " + this);
+ }
resumeTx(me);
me.handleAccept();
}
@@ -265,47 +301,67 @@
}
protected void doSend(MessageExchangeImpl messageExchange, boolean sync) throws MessagingException {
- // If the message has timed out
- if (messageExchange.getPacket().isAborted()) {
- throw new ExchangeTimeoutException(messageExchange);
- }
- // Auto enlist exchange in transaction
- autoEnlistInTx(messageExchange);
- // Update persistence info
- Boolean persistent = messageExchange.getPersistent();
- if (persistent == null) {
- if (context.getActivationSpec().getPersistent() != null) {
- persistent = context.getActivationSpec().getPersistent();
- } else {
- persistent = Boolean.valueOf(context.getContainer().isPersistent());
- }
- messageExchange.setPersistent(persistent);
- }
-
- if (exchangeThrottling) {
- if (throttlingInterval > intervalCount) {
- intervalCount = 0;
- try {
- Thread.sleep(throttlingTimeout);
+ try {
+ // If the delivery channel has been closed
+ checkNotClosed();
+ // If the message has timed out
+ if (messageExchange.getPacket().isAborted()) {
+ throw new ExchangeTimeoutException(messageExchange);
+ }
+ // Auto enlist exchange in transaction
+ autoEnlistInTx(messageExchange);
+ // Update persistence info
+ Boolean persistent = messageExchange.getPersistent();
+ if (persistent == null) {
+ if (context.getActivationSpec().getPersistent() != null) {
+ persistent = context.getActivationSpec().getPersistent();
+ } else {
+ persistent = Boolean.valueOf(context.getContainer().isPersistent());
+ }
+ messageExchange.setPersistent(persistent);
+ }
+
+ if (exchangeThrottling) {
+ if (throttlingInterval > intervalCount) {
+ intervalCount = 0;
+ try {
+ Thread.sleep(throttlingTimeout);
+ }
+ catch (InterruptedException e) {
+ log.warn("throttling failed", e);
+ }
}
- catch (InterruptedException e) {
- log.warn("throttling failed", e);
+ intervalCount++;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ messagingStats.getOutboundExchanges().increment();
+ messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
+ lastSendTime = currentTime;
+ if (messageExchange.getRole() == Role.CONSUMER) {
+ messageExchange.setSourceId(componentConnector.getComponentNameSpace());
+ }
+
+ messageExchange.handleSend(sync);
+ container.sendExchange(messageExchange.getMirror());
+ } catch (MessagingException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Exception processing: " + messageExchange.getExchangeId() + " in " + this);
+ }
+ if (messageExchange.getTransactionContext() != null) {
+ suspendTx(messageExchange);
+ if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+ if (log.isDebugEnabled()) {
+ log.debug("Notifying: " + messageExchange.getExchangeId() + " in " + this);
+ }
+ synchronized (messageExchange.getMirror()) {
+ messageExchange.getMirror().notify();
+ }
}
}
- intervalCount++;
+ throw e;
}
- long currentTime = System.currentTimeMillis();
- messagingStats.getOutboundExchanges().increment();
- messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
- lastSendTime = currentTime;
- if (messageExchange.getRole() == Role.CONSUMER) {
- messageExchange.setSourceId(componentConnector.getComponentNameSpace());
- }
-
- messageExchange.handleSend(sync);
- container.sendExchange(messageExchange.getMirror());
-
/*
if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
synchronized (messageExchange.getMirror()) {
@@ -324,7 +380,6 @@
* @throws MessagingException
*/
public void send(MessageExchange messageExchange) throws MessagingException {
- checkNotClosed();
messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
doSend(messageExchangeImpl, false);
@@ -350,7 +405,9 @@
* @throws MessagingException
*/
public boolean sendSync(MessageExchange messageExchange, long timeoutMS) throws MessagingException {
- checkNotClosed();
+ if (log.isDebugEnabled()) {
+ log.debug("Sending " + messageExchange.getExchangeId() + " in " + this);
+ }
// JBI 5.5.2.1.3: set the sendSync property
messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
@@ -528,8 +585,24 @@
suspendTx(me);
synchronized (me.getMirror()) {
me.getMirror().setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_SENT);
+ if (log.isDebugEnabled()) {
+ log.debug("Queuing: " + me.getExchangeId() + " in " + this);
+ }
queue.put(me);
- me.getMirror().wait(Long.MAX_VALUE);
+ if (log.isDebugEnabled()) {
+ log.debug("Waiting: " + me.getExchangeId() + " in " + this);
+ }
+ // If the channel is closed while here,
+ // we must abort
+ waiters.put(Thread.currentThread(), Boolean.TRUE);
+ try {
+ me.getMirror().wait();
+ } finally {
+ waiters.remove(Thread.currentThread());
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Notified: " + me.getExchangeId() + " in " + this);
+ }
}
resumeTx(me);
} else {
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jca/JCAFlow.java (748 => 749)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jca/JCAFlow.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jca/JCAFlow.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -43,10 +43,11 @@
import org.servicemix.jbi.framework.ComponentNameSpace;
import org.servicemix.jbi.framework.ComponentPacket;
import org.servicemix.jbi.framework.ComponentPacketEvent;
+import org.servicemix.jbi.framework.ComponentPacketEventListener;
import org.servicemix.jbi.framework.LocalComponentConnector;
import org.servicemix.jbi.messaging.MessageExchangeImpl;
import org.servicemix.jbi.nmr.Broker;
-import org.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.servicemix.jbi.nmr.flow.AbstractFlow;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
@@ -78,10 +79,11 @@
*
* @version $Revision$
*/
-public class JCAFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
+public class JCAFlow extends AbstractFlow implements ConsumerAdvisoryEventListener, MessageListener, ComponentPacketEventListener {
+
private static final Log log = LogFactory.getLog(JCAFlow.class);
private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
- private String jmsURL = "peer://org.servicemix?persistent=false";
+ private String jmsURL = "tcp://localhost:61616";
private String userName;
private String password;
private ConnectionFactory connectionFactory;
@@ -201,6 +203,7 @@
*/
public void init(Broker broker, String subType) throws JBIException {
super.init(broker, subType);
+ broker.getRegistry().addComponentPacketListener(this);
try {
resourceAdapter = createResourceAdapter();
@@ -327,23 +330,6 @@
}
/**
- * Distribute an ExchangePacket
- *
- * @param packet
- * @throws JBIException
- */
- protected void doSend(MessageExchangeImpl me) throws JBIException {
- if (me.getRole() == Role.PROVIDER
- && me.getTransactionContext() == null
- && !isPersistent(me)) {
- enqueuePacket(me);
- }
- else {
- doRouting(me);
- }
- }
-
- /**
* Ability for this flow to persist exchanges.
*
* @return <code>true</code> if this flow can persist messages
@@ -358,7 +344,6 @@
* @param event
*/
public void onEvent(final ComponentPacketEvent event) {
- super.onEvent(event);
try {
String componentName = event.getPacket().getComponentNameSpace().getName();
if (event.getStatus() == ComponentPacketEvent.ACTIVATED){
@@ -419,6 +404,16 @@
* @param packet
* @throws MessagingException
*/
+ protected void doSend(MessageExchangeImpl me) throws MessagingException {
+ doRouting(me);
+ }
+
+ /**
+ * Distribute an ExchangePacket
+ *
+ * @param packet
+ * @throws MessagingException
+ */
public void doRouting(final MessageExchangeImpl me) throws MessagingException {
ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (748 => 749)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -34,10 +34,11 @@
import org.servicemix.jbi.framework.ComponentNameSpace;
import org.servicemix.jbi.framework.ComponentPacket;
import org.servicemix.jbi.framework.ComponentPacketEvent;
+import org.servicemix.jbi.framework.ComponentPacketEventListener;
import org.servicemix.jbi.framework.LocalComponentConnector;
import org.servicemix.jbi.messaging.MessageExchangeImpl;
import org.servicemix.jbi.nmr.Broker;
-import org.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.servicemix.jbi.nmr.flow.AbstractFlow;
import javax.jbi.JBIException;
import javax.jbi.messaging.MessagingException;
@@ -62,7 +63,8 @@
*
* @version $Revision$
*/
-public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
+public class JMSFlow extends AbstractFlow implements ConsumerAdvisoryEventListener, MessageListener, ComponentPacketEventListener {
+
private static final Log log = LogFactory.getLog(JMSFlow.class);
private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
private String jmsURL = "peer://org.servicemix?persistent=false";
@@ -170,6 +172,7 @@
*/
public void init(Broker broker, String subType) throws JBIException {
super.init(broker, subType);
+ broker.getRegistry().addComponentPacketListener(this);
try {
if (connectionFactory == null) {
if (jmsURL != null) {
@@ -265,26 +268,11 @@
}
/**
- * Distribute an ExchangePacket
- *
- * @param packet
- * @throws JBIException
- */
- protected void doSend(MessageExchangeImpl me) throws JBIException {
- if (me.getTransactionContext() == null) {
- enqueuePacket(me);
- } else {
- doRouting(me);
- }
- }
-
- /**
* Process state changes in Components
*
* @param event
*/
public void onEvent(ComponentPacketEvent event) {
- super.onEvent(event);
try {
String componentName = event.getPacket().getComponentNameSpace().getName();
if (event.getStatus() == ComponentPacketEvent.ACTIVATED){
@@ -336,6 +324,16 @@
* @param packet
* @throws MessagingException
*/
+ protected void doSend(MessageExchangeImpl me) throws MessagingException {
+ doRouting(me);
+ }
+
+ /**
+ * Distribute an ExchangePacket
+ *
+ * @param packet
+ * @throws MessagingException
+ */
public void doRouting(MessageExchangeImpl me) throws MessagingException{
ComponentNameSpace id=me.getRole()==Role.PROVIDER?me.getDestinationId():me.getSourceId();
ComponentConnector cc=broker.getRegistry().getComponentConnector(id);
Modified: trunk/core/src/main/java/org/servicemix/jbi/util/BoundedLinkedQueue.java (748 => 749)
--- trunk/core/src/main/java/org/servicemix/jbi/util/BoundedLinkedQueue.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/util/BoundedLinkedQueue.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -17,6 +17,9 @@
**/
package org.servicemix.jbi.util;
+import java.util.ArrayList;
+import java.util.List;
+
public class BoundedLinkedQueue {
public static class LinkedNode {
@@ -86,6 +89,9 @@
/** Number of takes since last reconcile * */
protected int takeSidePutPermits_ = 0;
+
+ /** Close flag */
+ protected volatile boolean closed;
/**
* Create a queue with the given capacity
@@ -184,6 +190,8 @@
}
public Object peek() {
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
synchronized (head_) {
LinkedNode first = head_.next;
if (first != null)
@@ -196,6 +204,9 @@
public Object take() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
+
Object x = extract();
if (x != null)
return x;
@@ -207,6 +218,8 @@
if (x != null) {
return x;
} else {
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
takeGuard_.wait();
}
}
@@ -221,6 +234,9 @@
public Object poll(long msecs) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
+
Object x = extract();
if (x != null)
return x;
@@ -234,6 +250,8 @@
if (x != null || waitTime <= 0) {
return x;
} else {
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
takeGuard_.wait(waitTime);
waitTime = msecs
- (System.currentTimeMillis() - start);
@@ -275,6 +293,8 @@
throw new IllegalArgumentException();
if (Thread.interrupted())
throw new InterruptedException();
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
synchronized (putGuard_) {
@@ -283,6 +303,8 @@
if (reconcilePutPermits() <= 0) {
try {
for (;;) {
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
wait();
if (reconcilePutPermits() > 0) {
break;
@@ -306,6 +328,8 @@
throw new IllegalArgumentException();
if (Thread.interrupted())
throw new InterruptedException();
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
synchronized (putGuard_) {
@@ -320,6 +344,8 @@
long start = System.currentTimeMillis();
for (;;) {
+ if (closed)
+ throw new IllegalStateException("Channel is closed");
wait(waitTime);
if (reconcilePutPermits() > 0) {
break;
@@ -352,5 +378,30 @@
return head_.next == null;
}
}
+
+ public synchronized List closeAndFlush() {
+ // Set this queue as closed
+ closed = true;
+ // No more puts is allowed
+ synchronized (putGuard_) {
+ synchronized (this) {
+ takeSidePutPermits_ -= capacity_;
+ capacity_ = 0;
+ // Force immediate reconcilation.
+ reconcilePutPermits();
+ notifyAll();
+ }
+ }
+ synchronized (takeGuard_) {
+ takeGuard_.notifyAll();
+ }
+ ArrayList l = new ArrayList();
+ Object o;
+ while ((o = extract()) != null) {
+ l.add(o);
+ }
+ return l;
+ }
+
}
Modified: trunk/core/src/test/java/org/servicemix/examples/AsyncReceiverPojo.java (748 => 749)
--- trunk/core/src/test/java/org/servicemix/examples/AsyncReceiverPojo.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/examples/AsyncReceiverPojo.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -86,18 +86,18 @@
// Runnable interface
//-------------------------------------------------------------------------
public void run() {
- try {
- while (running) {
+ while (running) {
+ try {
DeliveryChannel deliveryChannel = context.getDeliveryChannel();
System.out.println("about to do an accept on deliveryChannel: " + deliveryChannel);
MessageExchange messageExchange = deliveryChannel.accept();
System.out.println("received me: " + messageExchange);
onMessageExchange(messageExchange);
}
+ catch (MessagingException e) {
+ log.error("Failed to process inbound messages: " + e, e);
+ }
}
- catch (MessagingException e) {
- log.error("Failed to process inbound messages: " + e, e);
- }
}
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java (748 => 749)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -101,9 +101,7 @@
}
public void testClusteredAsyncSendAsyncReceive() throws Exception {
- // TODO: correct this test as it fails currently
- throw new Exception("Correct this test case");
- //runClusteredTest(false, false);
+ runClusteredTest(false, false);
}
}
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java (748 => 749)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -54,8 +54,8 @@
}
protected void runSimpleTest(final boolean syncSend, final boolean syncReceive) throws Exception {
- final int numMessages = 10;
- //final int numMessages = NUM_MESSAGES;
+ //final int numMessages = 1;
+ final int numMessages = NUM_MESSAGES;
final SenderComponent sender = new SenderComponent();
sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
final Receiver receiver;
@@ -65,13 +65,13 @@
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
try {
if (delivered.get(exchange.getExchangeId()) == null) {
+ System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
delivered.put(exchange.getExchangeId(), Boolean.TRUE);
tm.setRollbackOnly();
done(exchange);
- System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
} else {
+ System.err.println("Message delivery accepted: " + exchange.getExchangeId());
super.onMessageExchange(exchange);
- System.err.println("Message delivery accepted: " + exchange.getExchangeId());
}
} catch (Exception e) {
throw new MessagingException(e);
@@ -83,14 +83,14 @@
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
try {
if (delivered.get(exchange.getExchangeId()) == null) {
+ System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
+ delivered.put(exchange.getExchangeId(), Boolean.TRUE);
tm.setRollbackOnly();
exchange.setStatus(ExchangeStatus.DONE);
getContext().getDeliveryChannel().send(exchange);
- System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
} else {
- delivered.put(exchange.getExchangeId(), Boolean.TRUE);
+ System.err.println("Message delivery accepted: " + exchange.getExchangeId());
super.onMessageExchange(exchange);
- System.err.println("Message delivery accepted: " + exchange.getExchangeId());
}
} catch (Exception e) {
throw new MessagingException(e);
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowPersistentTest.java (748 => 749)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowPersistentTest.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowPersistentTest.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -72,9 +72,8 @@
}
}
- // TODO: this one should work
public void testAsyncSendAsyncReceive() throws Exception {
- //runSimpleTest(false, false);
+ runSimpleTest(false, false);
}
}
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowTransactionTest.java (748 => 749)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowTransactionTest.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowTransactionTest.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -50,4 +50,22 @@
return flow;
}
+ public void testSyncSendSyncReceive() throws Exception {
+ try {
+ runSimpleTest(true, true);
+ fail("sendSync can not be used");
+ } catch (IllegalStateException e) {
+ // sendSync can not be used
+ }
+ }
+
+ public void testSyncSendAsyncReceive() throws Exception {
+ try {
+ runSimpleTest(true, false);
+ fail("sendSync can not be used");
+ } catch (IllegalStateException e) {
+ // sendSync can not be used
+ }
+ }
+
}
Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java (748 => 749)
--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -183,6 +183,6 @@
assertFalse(receiver2.getMessageList().hasReceivedMessage());
receiver1.getMessageList().flushMessages();
receiver2.getMessageList().flushMessages();
-
}
+
}
Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java (748 => 749)
--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java 2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java 2005-11-04 15:24:01 UTC (rev 749)
@@ -82,4 +82,45 @@
Thread.sleep(3000);
receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
}
+
+ public void testClusteredInOnly() throws Exception {
+ final SenderComponent sender = new SenderComponent();
+ final ReceiverComponent receiver1 = new ReceiverComponent();
+ final ReceiverComponent receiver2 = new ReceiverComponent();
+ sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
+
+ senderContainer.activateComponent(new ActivationSpec("sender", sender));
+ senderContainer.activateComponent(new ActivationSpec("receiver", receiver1));
+ receiverContainer.activateComponent(new ActivationSpec("receiver", receiver2));
+ Thread.sleep(1000);
+
+ sender.sendMessages(NUM_MESSAGES);
+ Thread.sleep(3000);
+ assertTrue(receiver1.getMessageList().hasReceivedMessage());
+ assertTrue(receiver2.getMessageList().hasReceivedMessage());
+ receiver1.getMessageList().flushMessages();
+ receiver2.getMessageList().flushMessages();
+
+ senderContainer.deactivateComponent("receiver");
+ Thread.sleep(1000);
+
+ sender.sendMessages(NUM_MESSAGES);
+ Thread.sleep(3000);
+ assertFalse(receiver1.getMessageList().hasReceivedMessage());
+ assertTrue(receiver2.getMessageList().hasReceivedMessage());
+ receiver1.getMessageList().flushMessages();
+ receiver2.getMessageList().flushMessages();
+
+ senderContainer.activateComponent(new ActivationSpec("receiver", receiver1));
+ receiverContainer.deactivateComponent("receiver");
+ Thread.sleep(1000);
+
+ sender.sendMessages(NUM_MESSAGES);
+ Thread.sleep(3000);
+ assertTrue(receiver1.getMessageList().hasReceivedMessage());
+ assertFalse(receiver2.getMessageList().hasReceivedMessage());
+ receiver1.getMessageList().flushMessages();
+ receiver2.getMessageList().flushMessages();
+ }
+
}