User: pkendall Date: 01/08/14 19:43:33 Modified: src/main/org/jboss/mq/server JMSTopic.java JMSServer.java JMSQueue.java JMSDestination.java ClientConsumer.java BasicQueue.java Log: move subscription from receivers list to blocked list when connection is stopped. Should stop messages from being delivered after connection is stopped. Revision Changes Path 1.2 +92 -80 jbossmq/src/main/org/jboss/mq/server/JMSTopic.java Index: JMSTopic.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSTopic.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- JMSTopic.java 2001/08/11 20:59:15 1.1 +++ JMSTopic.java 2001/08/15 02:43:33 1.2 @@ -29,13 +29,13 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.1 $ + * @version $Revision: 1.2 $ */ public class JMSTopic extends JMSDestination { //Hashmap of ExclusiveQueues HashMap durQueues = new HashMap(); - HashMap tempQueues = new HashMap(); + HashMap tempQueues = new HashMap(); // Constructor --------------------------------------------------- @@ -44,87 +44,100 @@ super( dest, temporary, server ); } - + public void clientConsumerStopped(ClientConsumer clientConsumer){ + synchronized (durQueues) { + Iterator iter = durQueues.values().iterator(); + while( iter.hasNext() ) { + ((BasicQueue)iter.next()).clientConsumerStopped(clientConsumer); + } + } + synchronized (tempQueues) { + Iterator iter = tempQueues.values().iterator(); + while( iter.hasNext() ) { + ((BasicQueue)iter.next()).clientConsumerStopped(clientConsumer); + } + } + } public void addSubscriber(Subscription sub) throws JMSException { SpyTopic topic = (SpyTopic)sub.destination; DurableSubcriptionID id = topic.getDurableSubscriptionID(); if( id==null ) { - BasicQueue q = new BasicQueue(server); - synchronized (tempQueues) { + BasicQueue q = new BasicQueue(server); + synchronized (tempQueues) { tempQueues.put(sub,q); - } + } }else{ - PersistentQueue q = null; - synchronized(durQueues){ + PersistentQueue q = null; + synchronized(durQueues){ q = (PersistentQueue) durQueues.get(id); - } - if(q == null || //Brand new durable subscriber - !q.destination.equals(topic)) //subscription changed to new topic + } + if(q == null || //Brand new durable subscriber + !q.destination.equals(topic)) //subscription changed to new topic server.getStateManager().setDurableSubscription(server, id, topic); } } - public void removeSubscriber(Subscription sub) throws JMSException - { + public void removeSubscriber(Subscription sub) throws JMSException + { BasicQueue queue = null; - SpyTopic topic = (SpyTopic)sub.destination; + SpyTopic topic = (SpyTopic)sub.destination; DurableSubcriptionID id = topic.getDurableSubscriptionID(); if( id==null ) { - synchronized (tempQueues) { - queue = (BasicQueue)tempQueues.remove(sub); - } + synchronized (tempQueues) { + queue = (BasicQueue)tempQueues.remove(sub); + } }else{ - synchronized (durQueues){ - queue = (BasicQueue)durQueues.get(id); //note DON'T remove - } + synchronized (durQueues){ + queue = (BasicQueue)durQueues.get(id); //note DON'T remove + } + } + queue.removeReceiver(sub); } - queue.removeReceiver(sub); - } - - public void addReceiver(Subscription sub){ - getQueue(sub).addReceiver(sub); - } - public void removeReceiver(Subscription sub){ - getQueue(sub).removeReceiver(sub); - } + public void addReceiver(Subscription sub){ + getQueue(sub).addReceiver(sub); + } - public void restoreMessage(SpyMessage message){ - synchronized(this){ - messageIdCounter = Math.max(messageIdCounter,message.messageId+1); + public void removeReceiver(Subscription sub){ + getQueue(sub).removeReceiver(sub); } - if(message.durableSubscriberID == null){ - cat.debug("Trying to restore message with null durableSubscriberID"); - }else{ - ((BasicQueue)durQueues.get(message.durableSubscriberID)).restoreMessage(message); + + public void restoreMessage(SpyMessage message){ + synchronized(this){ + messageIdCounter = Math.max(messageIdCounter,message.messageId+1); + } + if(message.durableSubscriberID == null){ + cat.debug("Trying to restore message with null durableSubscriberID"); + }else{ + ((BasicQueue)durQueues.get(message.durableSubscriberID)).restoreMessage(message); + } } - } - //called by state manager when a durable sub is created + //called by state manager when a durable sub is created public void createDurableSubscription(DurableSubcriptionID id) throws JMSException - { + { if( temporaryDestination != null ) throw new JMSException("Not a valid operation on a temporary topic"); SpyTopic dstopic = new SpyTopic( (SpyTopic)destination, id); - BasicQueue queue = new PersistentQueue(server,dstopic); + BasicQueue queue = new PersistentQueue(server,dstopic); synchronized (durQueues) { durQueues.put(id, queue); } } - //called by state manager when a durable sub is deleted + //called by state manager when a durable sub is deleted public void destoryDurableSubscription(DurableSubcriptionID id) throws JMSException { BasicQueue queue; - synchronized (durQueues) { + synchronized (durQueues) { queue = (BasicQueue)durQueues.remove(id); } - queue.destroy(); + queue.destroy(); } // Package protected --------------------------------------------- @@ -135,25 +148,25 @@ } public SpyMessage receive(Subscription sub,boolean wait) throws javax.jms.JMSException { - return getQueue(sub).receive(sub,wait); + return getQueue(sub).receive(sub,wait); } - private BasicQueue getQueue(Subscription sub){ - SpyTopic topic = (SpyTopic)sub.destination; + private BasicQueue getQueue(Subscription sub){ + SpyTopic topic = (SpyTopic)sub.destination; DurableSubcriptionID id = topic.getDurableSubscriptionID(); if( id!=null ) { - return getDurableSubscription(id); + return getDurableSubscription(id); }else{ - synchronized(tempQueues){ - return (BasicQueue)tempQueues.get(sub); - } - } - } - - public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId) throws JMSException{ - getQueue(sub).acknowledge(req,txId); - } + synchronized(tempQueues){ + return (BasicQueue)tempQueues.get(sub); + } + } + } + + public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId) throws JMSException{ + getQueue(sub).acknowledge(req,txId); + } public void addMessage(SpyMessage message, org.jboss.mq.pm.Tx txId) throws JMSException { @@ -164,30 +177,29 @@ // } //Number the message so that we can preserve order of delivery. - long messageId = 0; + long messageId = 0; synchronized(this) { - messageId = messageIdCounter++; - synchronized (durQueues) { - Iterator iter = durQueues.keySet().iterator(); - while( iter.hasNext() ) { - DurableSubcriptionID id = (DurableSubcriptionID) iter.next(); - PersistentQueue q = (PersistentQueue)durQueues.get(id); - SpyMessage clone = message.myClone(); - clone.durableSubscriberID = id; - clone.messageId = messageId; - q.addMessage(clone, txId); - } - } - synchronized (tempQueues) { - Iterator iter = tempQueues.values().iterator(); - while( iter.hasNext() ) { - BasicQueue q = (BasicQueue)iter.next(); - SpyMessage clone = message.myClone(); - clone.messageId = messageId; - q.addMessage(clone, txId); + messageId = messageIdCounter++; + synchronized (durQueues) { + Iterator iter = durQueues.keySet().iterator(); + while( iter.hasNext() ) { + DurableSubcriptionID id = (DurableSubcriptionID) iter.next(); + PersistentQueue q = (PersistentQueue)durQueues.get(id); + SpyMessage clone = message.myClone(); + clone.durableSubscriberID = id; + clone.messageId = messageId; + q.addMessage(clone, txId); + } + } + synchronized (tempQueues) { + Iterator iter = tempQueues.values().iterator(); + while( iter.hasNext() ) { + BasicQueue q = (BasicQueue)iter.next(); + SpyMessage clone = message.myClone(); + clone.messageId = messageId; + q.addMessage(clone, txId); + } + } } - } - } - - } + } } 1.2 +35 -30 jbossmq/src/main/org/jboss/mq/server/JMSServer.java Index: JMSServer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSServer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- JMSServer.java 2001/08/11 20:59:15 1.1 +++ JMSServer.java 2001/08/15 02:43:33 1.2 @@ -41,16 +41,16 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.1 $ + * @version $Revision: 1.2 $ */ public class JMSServer { - //Implement Singleton design pattern for JVMIL to work correctly - //It would be nice if there was another way of doing this. - protected static JMSServer theInstance = new JMSServer(); - public static JMSServer getInstance(){ + //Implement Singleton design pattern for JVMIL to work correctly + //It would be nice if there was another way of doing this. + protected static JMSServer theInstance = new JMSServer(); + public static JMSServer getInstance(){ return theInstance; - } + } ///////////////////////////////////////////////////////////////////// // Attributes @@ -70,7 +70,7 @@ //The persistence manager private PersistenceManager persistenceManager; - private Object stateLock = new Object(); + private Object stateLock = new Object(); /** * <code>true</code> when the server is running. <code>false</code> when the @@ -89,7 +89,7 @@ HashMap clientConsumers = new HashMap(); static org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance(JMSServer.class); - //Thread group for server side threads. + //Thread group for server side threads. public ThreadGroup threadGroup = new ThreadGroup("JBossMQ Server Threads"); ///////////////////////////////////////////////////////////////////// @@ -113,29 +113,29 @@ */ public boolean isStopped() { synchronized(stateLock){ - return this.stopped; + return this.stopped; } } - public void startServer() { + public void startServer() { synchronized(stateLock){ - this.stopped = false; + this.stopped = false; + } } - } public void stopServer() { synchronized(stateLock){ - this.stopped = true; - //Any work that needs doing should be done here + this.stopped = true; + //Any work that needs doing should be done here - //At the moment there is nothing to do due to the fact that the individual - //parts of the JBossMQ (pm, ils etc) each have their own mbean service - //which starts and stops them separately - - //We could wait in here for the client consumers to finish delivering any - //messages they have, but it is not neccessary as the client acks will be - //almost certainly be lost anyway. - this.alive = false; + //At the moment there is nothing to do due to the fact that the individual + //parts of the JBossMQ (pm, ils etc) each have their own mbean service + //which starts and stops them separately + + //We could wait in here for the client consumers to finish delivering any + //messages they have, but it is not neccessary as the client acks will be + //almost certainly be lost anyway. + this.alive = false; } } @@ -404,6 +404,11 @@ public void setEnabled(ConnectionToken dc, boolean enabled) throws JMSException { ClientConsumer ClientConsumer = getClientConsumer(dc); ClientConsumer.setEnabled(enabled); + if(!enabled){ + for(Iterator it = destinations.values().iterator();it.hasNext();){ + ((JMSDestination)it.next()).clientConsumerStopped(ClientConsumer); + } + } } public synchronized Queue createQueue(ConnectionToken dc, String name) throws JMSException @@ -436,9 +441,9 @@ } - public StateManager getStateManager() { + public StateManager getStateManager() { return stateManager; - } + } public String checkUser(String userName, String password) throws JMSException { return stateManager.checkUser(userName, password); @@ -462,9 +467,9 @@ - public void setStateManager(StateManager newStateManager) { + public void setStateManager(StateManager newStateManager) { stateManager = newStateManager; - } + } public static final String JBOSS_VESION = "JBossMQ ver. 0.9b"; @@ -480,11 +485,11 @@ return JBOSS_VESION; } - public org.jboss.mq.pm.PersistenceManager getPersistenceManager() { + public org.jboss.mq.pm.PersistenceManager getPersistenceManager() { return persistenceManager; - } + } - public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager newPersistenceManager) { + public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager newPersistenceManager) { persistenceManager = newPersistenceManager; - } + } } 1.2 +32 -28 jbossmq/src/main/org/jboss/mq/server/JMSQueue.java Index: JMSQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- JMSQueue.java 2001/08/11 20:59:15 1.1 +++ JMSQueue.java 2001/08/15 02:43:33 1.2 @@ -29,11 +29,11 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.1 $ + * @version $Revision: 1.2 $ */ public class JMSQueue extends JMSDestination { - BasicQueue queue; + BasicQueue queue; // Constructor --------------------------------------------------- JMSQueue(SpyDestination dest,ClientConsumer temporary,JMSServer server) throws JMSException @@ -42,38 +42,42 @@ // If this is a non-temp queue, then we should persist data if( temporaryDestination == null ) { - queue = new PersistentQueue(server,dest); + queue = new PersistentQueue(server,dest); }else{ - queue = new BasicQueue(server); + queue = new BasicQueue(server); } } - public void addSubscriber(Subscription sub){ - } + public void clientConsumerStopped(ClientConsumer clientConsumer){ + queue.clientConsumerStopped(clientConsumer); + } - public void removeSubscriber(Subscription sub){ - removeReceiver(sub); - } - - public void addReceiver(Subscription sub){ - queue.addReceiver(sub); - } - - public void removeReceiver(Subscription sub){ - queue.removeReceiver(sub); - } - - public void restoreMessage(SpyMessage message){ - synchronized(this){ - messageIdCounter = Math.max(messageIdCounter,message.messageId+1); + public void addSubscriber(Subscription sub){ } - queue.restoreMessage(message); - } + public void removeSubscriber(Subscription sub){ + removeReceiver(sub); + } + public void addReceiver(Subscription sub){ + queue.addReceiver(sub); + } + public void removeReceiver(Subscription sub){ + queue.removeReceiver(sub); + } + public void restoreMessage(SpyMessage message){ + synchronized(this){ + messageIdCounter = Math.max(messageIdCounter,message.messageId+1); + } + queue.restoreMessage(message); + } + + + + public SpyMessage[] browse(String selector) throws JMSException { return queue.browse( selector ); } @@ -84,9 +88,9 @@ - public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId) throws JMSException{ - queue.acknowledge(req,txId); - } + public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId) throws JMSException{ + queue.acknowledge(req,txId); + } public void addMessage(SpyMessage mes, org.jboss.mq.pm.Tx txId) throws JMSException { @@ -97,8 +101,8 @@ //Number the message so that we can preserve order of delivery. synchronized(this) { - mes.messageId = messageIdCounter++; - queue.addMessage(mes, txId); + mes.messageId = messageIdCounter++; + queue.addMessage(mes, txId); } } 1.2 +7 -7 jbossmq/src/main/org/jboss/mq/server/JMSDestination.java Index: JMSDestination.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSDestination.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- JMSDestination.java 2001/08/11 20:59:15 1.1 +++ JMSDestination.java 2001/08/15 02:43:33 1.2 @@ -28,7 +28,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.1 $ + * @version $Revision: 1.2 $ */ abstract public class JMSDestination { @@ -61,16 +61,16 @@ public abstract SpyMessage receive(Subscription sub,boolean wait) throws JMSException; - public abstract void addReceiver(Subscription sub); + public abstract void addReceiver(Subscription sub); - public abstract void removeReceiver(Subscription sub); + public abstract void removeReceiver(Subscription sub); - public abstract void restoreMessage(SpyMessage message); + public abstract void restoreMessage(SpyMessage message); + public abstract void clientConsumerStopped(ClientConsumer clientConsumer); - /** - * + * * @param req org.jboss.mq.AcknowledgementRequest * @param sub org.jboss.mq.Subscription * @param txId org.jboss.mq.pm.Tx @@ -79,7 +79,7 @@ public abstract void acknowledge(org.jboss.mq.AcknowledgementRequest req, org.jboss.mq.Subscription sub, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException; /** - * + * * @param mes org.jboss.mq.SpyMessage * @param txId org.jboss.mq.pm.Tx * @exception javax.jms.JMSException The exception description. 1.2 +7 -3 jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java Index: ClientConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- ClientConsumer.java 2001/08/11 20:59:15 1.1 +++ ClientConsumer.java 2001/08/15 02:43:33 1.2 @@ -28,7 +28,7 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * - * @version $Revision: 1.1 $ + * @version $Revision: 1.2 $ */ public class ClientConsumer implements Runnable { @@ -123,13 +123,17 @@ return queue.receive(req,(wait != -1)); } else if(wait != -1) { - blockedSubscriptions.add(req); + addBlockedSubscription(req); } return null; } - + void addBlockedSubscription(Subscription sub){ + synchronized(blockedSubscriptions){ + blockedSubscriptions.add(sub); + } + } public void removeSubscription(int subscriptionId) throws JMSException 1.2 +81 -71 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java Index: BasicQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- BasicQueue.java 2001/08/11 20:59:15 1.1 +++ BasicQueue.java 2001/08/15 02:43:33 1.2 @@ -37,7 +37,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.1 $ + * @version $Revision: 1.2 $ */ //abstract public class BasicQueue implements Runnable { public class BasicQueue { @@ -59,34 +59,45 @@ this.server=server; } - private void internalAddMessage(SpyMessage message) - { - //try waiting receivers - synchronized(receivers){ - if(!receivers.isEmpty()){ - for(Iterator it = receivers.iterator();it.hasNext();){ - Subscription sub = (Subscription)it.next(); - try{ - if(sub.accepts(message)){ - //queue message for sending to this sub - queueMessageForSending(sub,message); - it.remove(); - return; - } - }catch(JMSException ignore){ - cat.debug("Caught unusual exception in internalAddMessage.",ignore); - } + private void internalAddMessage(SpyMessage message) + { + //try waiting receivers + synchronized(receivers){ + if(!receivers.isEmpty()){ + for(Iterator it = receivers.iterator();it.hasNext();){ + Subscription sub = (Subscription)it.next(); + try{ + if(sub.accepts(message)){ + //queue message for sending to this sub + queueMessageForSending(sub,message); + it.remove(); + return; + } + }catch(JMSException ignore){ + cat.debug("Caught unusual exception in internalAddMessage.",ignore); + } + } + } } - } - } //else add to message list synchronized (messages) { messages.add(message); } - } - + } + public void clientConsumerStopped(ClientConsumer clientConsumer){ + //remove all waiting subs for this clientConsumer and send to its blocked list. + synchronized (receivers) { + for(Iterator it = receivers.iterator();it.hasNext();){ + Subscription sub = (Subscription) it.next(); + if(sub.clientConsumer.equals(clientConsumer)){ + clientConsumer.addBlockedSubscription(sub); + it.remove(); + } + } + } + } //Used to put a message that was added previously to the queue, back in the queue public void restoreMessage(SpyMessage mes) @@ -125,7 +136,7 @@ } } - protected void setupMessageAcknowledgement(Subscription sub,SpyMessage message){ + protected void setupMessageAcknowledgement(Subscription sub,SpyMessage message){ AcknowledgementRequest ack = new AcknowledgementRequest(); ack.destination = message.getJMSDestination(); ack.messageID = message.getJMSMessageID(); @@ -133,61 +144,61 @@ ack.isAck = false; synchronized (unacknowledgedMessages) { - unacknowledgedMessages.put(ack, message); + unacknowledgedMessages.put(ack, message); + } } - } - protected void queueMessageForSending(Subscription sub, SpyMessage message) throws JMSException{ + protected void queueMessageForSending(Subscription sub, SpyMessage message) throws JMSException{ setupMessageAcknowledgement(sub,message); ReceiveRequest r = new ReceiveRequest(); r.message = message; r.subscriptionId = new Integer(sub.subscriptionId); ((ClientConsumer)sub.clientConsumer).queueMessageForSending(r); - } + } - public void addReceiver(Subscription sub){ + public void addReceiver(Subscription sub){ synchronized(messages){ - if(messages.size() != 0){ + if(messages.size() != 0){ for(Iterator it = messages.iterator();it.hasNext();){ - SpyMessage message = (SpyMessage)it.next(); - try{ + SpyMessage message = (SpyMessage)it.next(); + try{ if(sub.accepts(message)){ - //queue message for sending to this sub - queueMessageForSending(sub,message); - it.remove(); - return; + //queue message for sending to this sub + queueMessageForSending(sub,message); + it.remove(); + return; } - }catch(JMSException ignore){ + }catch(JMSException ignore){ cat.debug("Caught unusual exception in addToReceivers.",ignore); - } + } } - } + } } addToReceivers(sub); - } + } - protected void addToReceivers(Subscription sub){ - synchronized(receivers){ - receivers.add(sub); + protected void addToReceivers(Subscription sub){ + synchronized(receivers){ + receivers.add(sub); + } } - } - public void removeReceiver(Subscription sub){ - synchronized(receivers){ - receivers.remove(sub); + public void removeReceiver(Subscription sub){ + synchronized(receivers){ + receivers.remove(sub); + } } - } public SpyMessage receive(Subscription sub, boolean wait) throws JMSException { - SpyMessage message = null; + SpyMessage message = null; Selector selector = sub.getSelector(); if( selector == null ) { synchronized (messages) { if (messages.size()!=0){ - message = (SpyMessage)messages.first(); - messages.remove(message); + message = (SpyMessage)messages.first(); + messages.remove(message); } } } else { @@ -196,36 +207,35 @@ while( i.hasNext() ) { SpyMessage m = (SpyMessage)i.next(); if( selector.test(m) ) { - message = m; - messages.remove(message); + message = m; + messages.remove(message); break; } } } } - if(message == null){ - if(wait) - addToReceivers(sub); - }else{ - setupMessageAcknowledgement(sub,message); - } - return message; + if(message == null){ + if(wait) + addToReceivers(sub); + }else{ + setupMessageAcknowledgement(sub,message); + } + return message; } - - public void destroy() throws JMSException{ - synchronized (unacknowledgedMessages) { - Iterator i = ((HashMap)unacknowledgedMessages.clone()).keySet().iterator(); - while( i.hasNext() ) { - AcknowledgementRequest item = (AcknowledgementRequest)i.next(); - try { - acknowledge(item, null); - } catch ( JMSException ignore ) { + public void destroy() throws JMSException{ + synchronized (unacknowledgedMessages) { + Iterator i = ((HashMap)unacknowledgedMessages.clone()).keySet().iterator(); + while( i.hasNext() ) { + AcknowledgementRequest item = (AcknowledgementRequest)i.next(); + try { + acknowledge(item, null); + } catch ( JMSException ignore ) { + } + } } - } } - } public void acknowledge(AcknowledgementRequest item, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { @@ -279,7 +289,7 @@ public void run() { //restore a message to the message list... - internalAddMessage(message); + internalAddMessage(message); } } _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development