User: pkendall
  Date: 01/08/15 21:04:15

  Modified:    src/main/org/jboss/mq/server JMSTopic.java JMSQueue.java
                        ClientConsumer.java BasicQueue.java
  Log:
  Several subtle bug fixes.
  Transacted sessions were not rolled back when closed.
  Have to save subscriptions that have unacked messages.
  
  Revision  Changes    Path
  1.3       +1 -1      jbossmq/src/main/org/jboss/mq/server/JMSTopic.java
  
  Index: JMSTopic.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSTopic.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JMSTopic.java     2001/08/15 02:43:33     1.2
  +++ JMSTopic.java     2001/08/16 04:04:15     1.3
  @@ -29,7 +29,7 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class JMSTopic extends JMSDestination {
   
  @@ -92,7 +92,7 @@
                                queue = (BasicQueue)durQueues.get(id); //note DON'T 
remove
                        }
                }
  -             queue.removeReceiver(sub);
  +             queue.removeSubscriber(sub);
        }
   
        public void addReceiver(Subscription sub){
  
  
  
  1.3       +1 -1      jbossmq/src/main/org/jboss/mq/server/JMSQueue.java
  
  Index: JMSQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JMSQueue.java     2001/08/15 02:43:33     1.2
  +++ JMSQueue.java     2001/08/16 04:04:15     1.3
  @@ -29,7 +29,7 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class JMSQueue extends JMSDestination {
   
  @@ -56,7 +56,7 @@
        }
   
        public void removeSubscriber(Subscription sub){
  -             removeReceiver(sub);
  +             queue.removeSubscriber(sub);
        }
   
        public void addReceiver(Subscription sub){
  
  
  
  1.3       +23 -3     jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ClientConsumer.java       2001/08/15 02:43:33     1.2
  +++ ClientConsumer.java       2001/08/16 04:04:15     1.3
  @@ -28,7 +28,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class ClientConsumer implements Runnable {
   
  @@ -42,8 +42,10 @@
        boolean enabled;
        //Has this connection been closed?
        boolean closed = false;
  -     //Maps a a subsction id to a Subscription
  +     //Maps a subscription id to a Subscription
        HashMap subscriptions = new HashMap();
  +     //Maps a subscription id to a Subscription for subscriptions that have 
finished receiving
  +     HashMap removedSubscriptions = new HashMap();
   
        LinkedList blockedSubscriptions = new LinkedList();
        //LinkedList of the the temporary destinations that this client created
  @@ -140,12 +142,15 @@
        {
                
cat.debug(""+this+"->removeSubscription(subscriberId="+subscriptionId+")");
   
  +             Integer subId = new Integer(subscriptionId);
                Subscription req;
                synchronized (subscriptions ) {
  -
                        HashMap subscriptionsClone = (HashMap)subscriptions.clone();
  -                     req = (Subscription)subscriptionsClone.remove(new 
Integer(subscriptionId));
  +                     req = (Subscription) subscriptionsClone.remove(subId);
                        subscriptions = subscriptionsClone;
  +                     if(req != null){
  +                             removedSubscriptions.put(subId,req);
  +                     }
                }
   
                if( req == null )
  @@ -156,8 +161,16 @@
                        throw new JMSException("The subscription was registed with a 
destination that does not exist !");
   
                queue.removeSubscriber(req);
  +
  +     }
  +
  +     void removeRemovedSubscription(int subId){
  +             synchronized (subscriptions ) {
  +                     removedSubscriptions.remove(new Integer(subId));
  +             }
        }
   
  +
        // Iterate over the consumers asking them to take messages until they stop
        // consuming.
        public void run() {
  @@ -222,6 +235,13 @@
   
        public void acknowledge(AcknowledgementRequest request, org.jboss.mq.pm.Tx 
txId) throws JMSException{
                Subscription sub = (Subscription)subscriptions.get(new 
Integer(request.subscriberId));
  +
  +             if(sub == null){ //might be in removed subscriptions
  +                     synchronized(subscriptions){
  +                             sub = (Subscription)removedSubscriptions.get(new 
Integer(request.subscriberId));
  +                     }
  +             }
  +
                if( sub == null )
                        throw new JMSException("The provided subscription does not 
exist");
   
  
  
  
  1.3       +44 -10    jbossmq/src/main/org/jboss/mq/server/BasicQueue.java
  
  Index: BasicQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- BasicQueue.java   2001/08/15 02:43:33     1.2
  +++ BasicQueue.java   2001/08/16 04:04:15     1.3
  @@ -37,7 +37,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   //abstract public class BasicQueue implements Runnable {
   public class BasicQueue {
  @@ -51,6 +51,8 @@
        //owning exclusive queues.
        HashMap unacknowledgedMessages = new HashMap();
   
  +     HashMap removedSubscribers = new HashMap();
  +
        static org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(BasicQueue.class);
   
        // Constructor ---------------------------------------------------
  @@ -102,11 +104,10 @@
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes)
        {
  -     internalAddMessage(mes);
  +             internalAddMessage(mes);
        }
   
   
  -
        public SpyMessage[] browse(String selector) throws JMSException {
   
                if( selector == null ) {
  @@ -137,15 +138,15 @@
        }
   
        protected void setupMessageAcknowledgement(Subscription sub,SpyMessage 
message){
  -     AcknowledgementRequest ack = new AcknowledgementRequest();
  -     ack.destination = message.getJMSDestination();
  -     ack.messageID = message.getJMSMessageID();
  -     ack.subscriberId = sub.subscriptionId;
  -     ack.isAck = false;
  +             AcknowledgementRequest ack = new AcknowledgementRequest();
  +             ack.destination = message.getJMSDestination();
  +             ack.messageID = message.getJMSMessageID();
  +             ack.subscriberId = sub.subscriptionId;
  +             ack.isAck = false;
   
  -     synchronized (unacknowledgedMessages) {
  -             unacknowledgedMessages.put(ack, message);
  -     }
  +             synchronized (unacknowledgedMessages) {
  +                     unacknowledgedMessages.put(ack, message);
  +             }
        }
   
        protected void queueMessageForSending(Subscription sub, SpyMessage message) 
throws JMSException{
  @@ -190,6 +191,37 @@
                }
        }
   
  +     public void removeSubscriber(Subscription sub){
  +             removeReceiver(sub);
  +             if(hasUnackedMessages(sub.subscriptionId)){
  +                     synchronized(removedSubscribers){
  +                             removedSubscribers.put(new 
Integer(sub.subscriptionId),sub);
  +                     }
  +             }else{
  +                     
((ClientConsumer)sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
  +             }
  +     }
  +
  +     protected boolean hasUnackedMessages(int sub){
  +             synchronized(unacknowledgedMessages){
  +                     for(Iterator it = 
unacknowledgedMessages.keySet().iterator();it.hasNext();){
  +                             if(((AcknowledgementRequest)it.next()).subscriberId == 
sub)
  +                                     return true;
  +                     }
  +                     return false;
  +             }
  +     }
  +
  +     protected void checkRemovedSubscribers(int subId){
  +             Integer id = new Integer(subId);
  +             synchronized(removedSubscribers){
  +                     if(removedSubscribers.containsKey(id) && 
!hasUnackedMessages(subId)){
  +                             Subscription sub = 
(Subscription)removedSubscribers.remove(id);
  +                             
((ClientConsumer)sub.clientConsumer).removeRemovedSubscription(subId);
  +                     }
  +             }
  +     }
  +
        public SpyMessage receive(Subscription sub, boolean wait) throws  JMSException
        {
                SpyMessage message = null;
  @@ -254,6 +286,8 @@
                synchronized (unacknowledgedMessages) {
                        m = (SpyMessage)unacknowledgedMessages.remove(item);
                }
  +
  +             checkRemovedSubscribers(item.subscriberId);
   
                if (m == null)
                        return;
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to