jstrachan    2003/08/27 09:26:38

  Modified:    messenger/src/java/org/apache/commons/messenger
                        MessengerSession.java DefaultMessenger.java
                        MessengerSupport.java
  Log:
  removed the caching of MessageProducer instances.

Also refactored some of the code into the DefaultMessenger implementation to make 
other Messenger implementations easier
  
  Revision  Changes    Path
  1.7       +7 -15     
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java
  
  Index: MessengerSession.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- MessengerSession.java     4 Mar 2003 10:20:59 -0000       1.6
  +++ MessengerSession.java     27 Aug 2003 16:26:38 -0000      1.7
  @@ -46,15 +46,15 @@
       /** An optional cache of requestors */
       private Map requestorsMap;    
   
  -    /** the cache of producers */
  -    private Map producers;
  -    
       /** The inbox which is used for the call() methods */
       private Destination replyToDestination;
   
       /** The current messenger to which I'm connected */
       private MessengerSupport messenger;
   
  +     /** The producer used to send messages using this session */
  +    private MessageProducer producer;
  +
       public MessengerSession(MessengerSupport messenger, SessionFactory 
sessionFactory) {
           this.messenger = messenger;
           this.sessionFactory = sessionFactory;
  @@ -101,16 +101,8 @@
        * @return the MessageProducer for the given destination.
        */
       public MessageProducer getMessageProducer(Destination destination) throws 
JMSException {
  -        MessageProducer producer = null;
  -        if ( producers == null ) {
  -            producers = new HashMap();
  -        }
  -        else {
  -            producer = (MessageProducer) producers.get( destination );
  -        }
  -        if ( producer == null ) {
  -            producer = messenger.createMessageProducer( session, destination );
  -            producers.put( destination, producer );
  +     if (producer == null) {
  +             producer = messenger.createMessageProducer( session, null );
           }
           return producer;
       }
  
  
  
  1.18      +155 -1    
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
  
  Index: DefaultMessenger.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- DefaultMessenger.java     20 Mar 2003 08:32:54 -0000      1.17
  +++ DefaultMessenger.java     27 Aug 2003 16:26:38 -0000      1.18
  @@ -58,14 +58,19 @@
   
   import javax.jms.Connection;
   import javax.jms.ConnectionFactory;
  +import javax.jms.Destination;
   import javax.jms.JMSException;
  +import javax.jms.Message;
  +import javax.jms.MessageConsumer;
   import javax.jms.MessageListener;
   import javax.jms.MessageProducer;
   import javax.jms.Queue;
  +import javax.jms.QueueSender;
   import javax.jms.QueueSession;
   import javax.jms.ServerSessionPool;
   import javax.jms.Session;
   import javax.jms.Topic;
  +import javax.jms.TopicPublisher;
   import javax.jms.TopicSession;
   import javax.naming.Context;
   
  @@ -133,6 +138,94 @@
       public Session getAsyncSession() throws JMSException {
           return getMessengerSession().getListenerSession();
       }
  +
  +     public Message call(Destination destination, Message message)
  +             throws JMSException {
  +             Session session = borrowSession();
  +             MessageProducer producer = null;
  +             try {
  +                     Destination replyTo = getReplyToDestination();
  +                     message.setJMSReplyTo(replyTo);
  +
  +                     // NOTE - we could consider adding a correlation ID per 
request so that we can ignore
  +                     // any cruft or old messages that are sent onto our inbound 
queue.
  +                     //
  +                     // Though that does mean that we must then rely on the inbound 
message having
  +                     // the right correlationID. Though at least this strategy 
would mean
  +                     // that we could have a single consumer on a temporary queue 
for all threads
  +                     // and use correlation IDs to dispatch to the corrent thread
  +                     //
  +                     // Maybe this should be a configurable strategy
  +
  +                     producer = borrowMessageProducer(session, destination);
  +                     MessageConsumer consumer = getReplyToConsumer();
  +
  +                     if (isTopic(producer)) {
  +                             ((TopicPublisher) producer).publish((Topic) 
destination, message);
  +                     }
  +                     else {
  +                             ((QueueSender) producer).send((Queue) destination, 
message);
  +                     }
  +                     Message response = consumer.receive();
  +                     if (response == null) {
  +                             // we could have timed out so lets trash the temporary 
destination
  +                             // so that the next call() method will use a new 
destination to avoid
  +                             // the response for this call() coming back on later 
call() invokcations
  +                             clearReplyToDestination();
  +                     }
  +                     return response;
  +             }
  +             finally {
  +                     returnMessageProducer(producer);
  +                     returnSession(session);
  +             }
  +     }
  +
  +     public Message call(
  +             Destination destination,
  +             Message message,
  +             long timeoutMillis)
  +             throws JMSException {
  +             Session session = borrowSession();
  +             MessageProducer producer = null;
  +             try {
  +                     Destination replyTo = getReplyToDestination();
  +                     message.setJMSReplyTo(replyTo);
  +
  +                     // NOTE - we could consider adding a correlation ID per 
request so that we can ignore
  +                     // any cruft or old messages that are sent onto our inbound 
queue.
  +                     //
  +                     // Though that does mean that we must then rely on the inbound 
message having
  +                     // the right correlationID. Though at least this strategy 
would mean
  +                     // that we could have a single consumer on a temporary queue 
for all threads
  +                     // and use correlation IDs to dispatch to the corrent thread
  +                     //
  +                     // Maybe this should be a configurable strategy
  +
  +                     producer = borrowMessageProducer(session, destination);
  +
  +                     MessageConsumer consumer = getReplyToConsumer();
  +                     if (isTopic(producer)) {
  +                             ((TopicPublisher) producer).publish((Topic) 
destination, message);
  +                     }
  +                     else {
  +                             ((QueueSender) producer).send((Queue) destination, 
message);
  +                     }
  +                     Message response = consumer.receive(timeoutMillis);
  +                     if (response == null) {
  +                             // we could have timed out so lets trash the temporary 
destination
  +                             // so that the next call() method will use a new 
destination to avoid
  +                             // the response for this call() coming back on later 
call() invokcations
  +                             clearReplyToDestination();
  +                     }
  +                     return response;
  +             }
  +             finally {
  +                     returnMessageProducer(producer);
  +                     returnSession(session);
  +             }
  +     }
  +
       
       // Implementation methods
       //-------------------------------------------------------------------------
  @@ -163,8 +256,69 @@
           return getMessengerSession().getListenerSession();
       }
       
  +     /** @return a message producer for the given session and destination */
  +     protected MessageProducer borrowMessageProducer(
  +             Session session,
  +             Destination destination)
  +             throws JMSException {
  +
  +             if (isCacheProducers()) {
  +                     return getMessengerSession().getMessageProducer(destination);
  +             }
  +             else {
  +                     return createMessageProducer(session, destination);
  +             }
  +     }
  +
  +     protected void returnMessageProducer(MessageProducer producer)
  +             throws JMSException {
  +             if (!isCacheProducers()) {
  +                     producer.close();
  +             }
  +     }
  +    
       protected void returnListenerSession(Session session) throws JMSException {
       }
  +
  +     /**
  +      * @return the MessageConsumer for this threads temporary destination
  +      * which is cached for the duration of this process.
  +      */
  +     protected MessageConsumer getReplyToConsumer() throws JMSException {
  +             MessengerSession messengerSession = getMessengerSession();
  +             MessageConsumer consumer = messengerSession.getReplyToConsumer();
  +             if (consumer == null) {
  +                     consumer =
  +                             createMessageConsumer(
  +                                     messengerSession.getSession(),
  +                                     messengerSession.getReplyToDestination());
  +                     messengerSession.setReplyToConsumer(consumer);
  +             }
  +             return consumer;
  +     }
  +
  +     /**
  +      * Clears the temporary destination used to receive reply-to messages
  +      * which will lazily force a new destination and consumer to be created next
  +      * time a call() method is invoked.
  +      */
  +     protected void clearReplyToDestination() throws JMSException {
  +             MessengerSession messengerSession = getMessengerSession();
  +
  +             messengerSession.setReplyToDestination(null);
  +             MessageConsumer consumer = messengerSession.getReplyToConsumer();
  +             if (consumer != null) {
  +                     messengerSession.setReplyToConsumer(null);
  +
  +                     // ensure that everything is nullified first before we close
  +                     // just in case an exception occurs
  +                     consumer.close();
  +             }
  +     }
  +
  +     protected Destination getReplyToDestination() throws JMSException {
  +             return getMessengerSession().getReplyToDestination();
  +     }
   
       /**
        * @return the current thread's MessengerSession
  
  
  
  1.36      +9 -153    
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.35
  retrieving revision 1.36
  diff -u -r1.35 -r1.36
  --- MessengerSupport.java     6 Jun 2003 10:51:08 -0000       1.35
  +++ MessengerSupport.java     27 Aug 2003 16:26:38 -0000      1.36
  @@ -154,10 +154,10 @@
           try {
               producer = borrowMessageProducer(session, destination);
               if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  +                ((TopicPublisher) producer).publish((Topic) destination, message);
               }
               else {
  -                ((QueueSender) producer).send(message);
  +                ((QueueSender) producer).send((Queue) destination, message);
               }
           }
           finally {
  @@ -166,93 +166,6 @@
           }
       }
   
  -    public Message call(Destination destination, Message message)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            Destination replyTo = getReplyToDestination();
  -            message.setJMSReplyTo(replyTo);
  -
  -            // NOTE - we could consider adding a correlation ID per request so that 
we can ignore
  -            // any cruft or old messages that are sent onto our inbound queue.
  -            //
  -            // Though that does mean that we must then rely on the inbound message 
having
  -            // the right correlationID. Though at least this strategy would mean
  -            // that we could have a single consumer on a temporary queue for all 
threads
  -            // and use correlation IDs to dispatch to the corrent thread
  -            //
  -            // Maybe this should be a configurable strategy
  -
  -            producer = borrowMessageProducer(session, destination);
  -            MessageConsumer consumer = getReplyToConsumer();
  -
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  -            }
  -            else {
  -                ((QueueSender) producer).send(message);
  -            }
  -            Message response = consumer.receive();
  -            if (response == null) {
  -                // we could have timed out so lets trash the temporary destination
  -                // so that the next call() method will use a new destination to 
avoid
  -                // the response for this call() coming back on later call() 
invokcations
  -                clearReplyToDestination();
  -            }
  -            return response;
  -        }
  -        finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  -        }
  -    }
  -
  -    public Message call(
  -        Destination destination,
  -        Message message,
  -        long timeoutMillis)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            Destination replyTo = getReplyToDestination();
  -            message.setJMSReplyTo(replyTo);
  -
  -            // NOTE - we could consider adding a correlation ID per request so that 
we can ignore
  -            // any cruft or old messages that are sent onto our inbound queue.
  -            //
  -            // Though that does mean that we must then rely on the inbound message 
having
  -            // the right correlationID. Though at least this strategy would mean
  -            // that we could have a single consumer on a temporary queue for all 
threads
  -            // and use correlation IDs to dispatch to the corrent thread
  -            //
  -            // Maybe this should be a configurable strategy
  -
  -            producer = borrowMessageProducer(session, destination);
  -
  -            MessageConsumer consumer = getReplyToConsumer();
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  -            }
  -            else {
  -                ((QueueSender) producer).send(message);
  -            }
  -            Message response = consumer.receive(timeoutMillis);
  -            if (response == null) {
  -                // we could have timed out so lets trash the temporary destination
  -                // so that the next call() method will use a new destination to 
avoid
  -                // the response for this call() coming back on later call() 
invokcations
  -                clearReplyToDestination();
  -            }
  -            return response;
  -        }
  -        finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  -        }
  -    }
  -
       public Message receive(Destination destination) throws JMSException {
           Session session = borrowSession();
           MessageConsumer consumer = null;
  @@ -758,6 +671,7 @@
               producer = borrowMessageProducer(session, destination);
               if (isTopic(producer)) {
                   ((TopicPublisher) producer).publish(
  +                     (Topic) destination,
                       message,
                       deliveryMode,
                       priority,
  @@ -765,6 +679,7 @@
               }
               else {
                   ((QueueSender) producer).send(
  +                     (Queue) destination, 
                       message,
                       deliveryMode,
                       priority,
  @@ -947,32 +862,12 @@
       protected abstract boolean isTopic(MessageProducer producer)
           throws JMSException;
   
  -    /**
  -     * @return the current thread's MessengerSession
  -     */
  -    protected abstract MessengerSession getMessengerSession()
  -        throws JMSException;
  -
       /** @return a message producer for the given session and destination */
  -    protected MessageProducer borrowMessageProducer(
  +    protected abstract MessageProducer borrowMessageProducer(
           Session session,
  -        Destination destination)
  -        throws JMSException {
  +        Destination destination) throws JMSException;
   
  -        if (isCacheProducers()) {
  -            return getMessengerSession().getMessageProducer(destination);
  -        }
  -        else {
  -            return createMessageProducer(session, destination);
  -        }
  -    }
  -
  -    protected void returnMessageProducer(MessageProducer producer)
  -        throws JMSException {
  -        if (!isCacheProducers()) {
  -            producer.close();
  -        }
  -    }
  +    protected abstract void returnMessageProducer(MessageProducer producer) throws 
JMSException;
   
       /** @return a newly created message producer for the given session and 
destination */
       protected MessageProducer createMessageProducer(
  @@ -997,41 +892,6 @@
           return answer;
       }
   
  -    /**
  -     * @return the MessageConsumer for this threads temporary destination
  -     * which is cached for the duration of this process.
  -     */
  -    protected MessageConsumer getReplyToConsumer() throws JMSException {
  -        MessengerSession messengerSession = getMessengerSession();
  -        MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -        if (consumer == null) {
  -            consumer =
  -                createMessageConsumer(
  -                    messengerSession.getSession(),
  -                    messengerSession.getReplyToDestination());
  -            messengerSession.setReplyToConsumer(consumer);
  -        }
  -        return consumer;
  -    }
  -
  -    /**
  -     * Clears the temporary destination used to receive reply-to messages
  -     * which will lazily force a new destination and consumer to be created next
  -     * time a call() method is invoked.
  -     */
  -    protected void clearReplyToDestination() throws JMSException {
  -        MessengerSession messengerSession = getMessengerSession();
  -
  -        messengerSession.setReplyToDestination(null);
  -        MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -        if (consumer != null) {
  -            messengerSession.setReplyToConsumer(null);
  -
  -            // ensure that everything is nullified first before we close
  -            // just in case an exception occurs
  -            consumer.close();
  -        }
  -    }
   
       /** @return a MessageConsumer for the given session and destination */
       protected MessageConsumer borrowMessageConsumer(
  @@ -1170,9 +1030,5 @@
           throws JMSException {
           // XXXX: might want to cache
           return session.createTopic(subject);
  -    }
  -
  -    protected Destination getReplyToDestination() throws JMSException {
  -        return getMessengerSession().getReplyToDestination();
       }
   }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to