User: pkendall Date: 01/08/15 21:04:15 Modified: src/main/org/jboss/mq/server JMSTopic.java JMSQueue.java ClientConsumer.java BasicQueue.java Log: Several subtle bug fixes. Transacted sessions were not rolled back when closed. Have to save subscriptions that have unacked messages. Revision Changes Path 1.3 +1 -1 jbossmq/src/main/org/jboss/mq/server/JMSTopic.java Index: JMSTopic.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSTopic.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- JMSTopic.java 2001/08/15 02:43:33 1.2 +++ JMSTopic.java 2001/08/16 04:04:15 1.3 @@ -29,7 +29,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class JMSTopic extends JMSDestination { @@ -92,7 +92,7 @@ queue = (BasicQueue)durQueues.get(id); //note DON'T remove } } - queue.removeReceiver(sub); + queue.removeSubscriber(sub); } public void addReceiver(Subscription sub){ 1.3 +1 -1 jbossmq/src/main/org/jboss/mq/server/JMSQueue.java Index: JMSQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- JMSQueue.java 2001/08/15 02:43:33 1.2 +++ JMSQueue.java 2001/08/16 04:04:15 1.3 @@ -29,7 +29,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class JMSQueue extends JMSDestination { @@ -56,7 +56,7 @@ } public void removeSubscriber(Subscription sub){ - removeReceiver(sub); + queue.removeSubscriber(sub); } public void addReceiver(Subscription sub){ 1.3 +23 -3 jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java Index: ClientConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- ClientConsumer.java 2001/08/15 02:43:33 1.2 +++ ClientConsumer.java 2001/08/16 04:04:15 1.3 @@ -28,7 +28,7 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class ClientConsumer implements Runnable { @@ -42,8 +42,10 @@ boolean enabled; //Has this connection been closed? boolean closed = false; - //Maps a a subsction id to a Subscription + //Maps a subscription id to a Subscription HashMap subscriptions = new HashMap(); + //Maps a subscription id to a Subscription for subscriptions that have finished receiving + HashMap removedSubscriptions = new HashMap(); LinkedList blockedSubscriptions = new LinkedList(); //LinkedList of the the temporary destinations that this client created @@ -140,12 +142,15 @@ { cat.debug(""+this+"->removeSubscription(subscriberId="+subscriptionId+")"); + Integer subId = new Integer(subscriptionId); Subscription req; synchronized (subscriptions ) { - HashMap subscriptionsClone = (HashMap)subscriptions.clone(); - req = (Subscription)subscriptionsClone.remove(new Integer(subscriptionId)); + req = (Subscription) subscriptionsClone.remove(subId); subscriptions = subscriptionsClone; + if(req != null){ + removedSubscriptions.put(subId,req); + } } if( req == null ) @@ -156,8 +161,16 @@ throw new JMSException("The subscription was registed with a destination that does not exist !"); queue.removeSubscriber(req); + + } + + void removeRemovedSubscription(int subId){ + synchronized (subscriptions ) { + removedSubscriptions.remove(new Integer(subId)); + } } + // Iterate over the consumers asking them to take messages until they stop // consuming. public void run() { @@ -222,6 +235,13 @@ public void acknowledge(AcknowledgementRequest request, org.jboss.mq.pm.Tx txId) throws JMSException{ Subscription sub = (Subscription)subscriptions.get(new Integer(request.subscriberId)); + + if(sub == null){ //might be in removed subscriptions + synchronized(subscriptions){ + sub = (Subscription)removedSubscriptions.get(new Integer(request.subscriberId)); + } + } + if( sub == null ) throw new JMSException("The provided subscription does not exist"); 1.3 +44 -10 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java Index: BasicQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- BasicQueue.java 2001/08/15 02:43:33 1.2 +++ BasicQueue.java 2001/08/16 04:04:15 1.3 @@ -37,7 +37,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ //abstract public class BasicQueue implements Runnable { public class BasicQueue { @@ -51,6 +51,8 @@ //owning exclusive queues. HashMap unacknowledgedMessages = new HashMap(); + HashMap removedSubscribers = new HashMap(); + static org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance(BasicQueue.class); // Constructor --------------------------------------------------- @@ -102,11 +104,10 @@ //Used to put a message that was added previously to the queue, back in the queue public void restoreMessage(SpyMessage mes) { - internalAddMessage(mes); + internalAddMessage(mes); } - public SpyMessage[] browse(String selector) throws JMSException { if( selector == null ) { @@ -137,15 +138,15 @@ } protected void setupMessageAcknowledgement(Subscription sub,SpyMessage message){ - AcknowledgementRequest ack = new AcknowledgementRequest(); - ack.destination = message.getJMSDestination(); - ack.messageID = message.getJMSMessageID(); - ack.subscriberId = sub.subscriptionId; - ack.isAck = false; + AcknowledgementRequest ack = new AcknowledgementRequest(); + ack.destination = message.getJMSDestination(); + ack.messageID = message.getJMSMessageID(); + ack.subscriberId = sub.subscriptionId; + ack.isAck = false; - synchronized (unacknowledgedMessages) { - unacknowledgedMessages.put(ack, message); - } + synchronized (unacknowledgedMessages) { + unacknowledgedMessages.put(ack, message); + } } protected void queueMessageForSending(Subscription sub, SpyMessage message) throws JMSException{ @@ -190,6 +191,37 @@ } } + public void removeSubscriber(Subscription sub){ + removeReceiver(sub); + if(hasUnackedMessages(sub.subscriptionId)){ + synchronized(removedSubscribers){ + removedSubscribers.put(new Integer(sub.subscriptionId),sub); + } + }else{ + ((ClientConsumer)sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId); + } + } + + protected boolean hasUnackedMessages(int sub){ + synchronized(unacknowledgedMessages){ + for(Iterator it = unacknowledgedMessages.keySet().iterator();it.hasNext();){ + if(((AcknowledgementRequest)it.next()).subscriberId == sub) + return true; + } + return false; + } + } + + protected void checkRemovedSubscribers(int subId){ + Integer id = new Integer(subId); + synchronized(removedSubscribers){ + if(removedSubscribers.containsKey(id) && !hasUnackedMessages(subId)){ + Subscription sub = (Subscription)removedSubscribers.remove(id); + ((ClientConsumer)sub.clientConsumer).removeRemovedSubscription(subId); + } + } + } + public SpyMessage receive(Subscription sub, boolean wait) throws JMSException { SpyMessage message = null; @@ -254,6 +286,8 @@ synchronized (unacknowledgedMessages) { m = (SpyMessage)unacknowledgedMessages.remove(item); } + + checkRemovedSubscribers(item.subscriberId); if (m == null) return; _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development