jstrachan 2003/08/27 09:26:38 Modified: messenger/src/java/org/apache/commons/messenger MessengerSession.java DefaultMessenger.java MessengerSupport.java Log: removed the caching of MessageProducer instances.
Also refactored some of the code into the DefaultMessenger implementation to make other Messenger implementations easier Revision Changes Path 1.7 +7 -15 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java Index: MessengerSession.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- MessengerSession.java 4 Mar 2003 10:20:59 -0000 1.6 +++ MessengerSession.java 27 Aug 2003 16:26:38 -0000 1.7 @@ -46,15 +46,15 @@ /** An optional cache of requestors */ private Map requestorsMap; - /** the cache of producers */ - private Map producers; - /** The inbox which is used for the call() methods */ private Destination replyToDestination; /** The current messenger to which I'm connected */ private MessengerSupport messenger; + /** The producer used to send messages using this session */ + private MessageProducer producer; + public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) { this.messenger = messenger; this.sessionFactory = sessionFactory; @@ -101,16 +101,8 @@ * @return the MessageProducer for the given destination. */ public MessageProducer getMessageProducer(Destination destination) throws JMSException { - MessageProducer producer = null; - if ( producers == null ) { - producers = new HashMap(); - } - else { - producer = (MessageProducer) producers.get( destination ); - } - if ( producer == null ) { - producer = messenger.createMessageProducer( session, destination ); - producers.put( destination, producer ); + if (producer == null) { + producer = messenger.createMessageProducer( session, null ); } return producer; } 1.18 +155 -1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java Index: DefaultMessenger.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v retrieving revision 1.17 retrieving revision 1.18 diff -u -r1.17 -r1.18 --- DefaultMessenger.java 20 Mar 2003 08:32:54 -0000 1.17 +++ DefaultMessenger.java 27 Aug 2003 16:26:38 -0000 1.18 @@ -58,14 +58,19 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.Topic; +import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.naming.Context; @@ -133,6 +138,94 @@ public Session getAsyncSession() throws JMSException { return getMessengerSession().getListenerSession(); } + + public Message call(Destination destination, Message message) + throws JMSException { + Session session = borrowSession(); + MessageProducer producer = null; + try { + Destination replyTo = getReplyToDestination(); + message.setJMSReplyTo(replyTo); + + // NOTE - we could consider adding a correlation ID per request so that we can ignore + // any cruft or old messages that are sent onto our inbound queue. + // + // Though that does mean that we must then rely on the inbound message having + // the right correlationID. Though at least this strategy would mean + // that we could have a single consumer on a temporary queue for all threads + // and use correlation IDs to dispatch to the corrent thread + // + // Maybe this should be a configurable strategy + + producer = borrowMessageProducer(session, destination); + MessageConsumer consumer = getReplyToConsumer(); + + if (isTopic(producer)) { + ((TopicPublisher) producer).publish((Topic) destination, message); + } + else { + ((QueueSender) producer).send((Queue) destination, message); + } + Message response = consumer.receive(); + if (response == null) { + // we could have timed out so lets trash the temporary destination + // so that the next call() method will use a new destination to avoid + // the response for this call() coming back on later call() invokcations + clearReplyToDestination(); + } + return response; + } + finally { + returnMessageProducer(producer); + returnSession(session); + } + } + + public Message call( + Destination destination, + Message message, + long timeoutMillis) + throws JMSException { + Session session = borrowSession(); + MessageProducer producer = null; + try { + Destination replyTo = getReplyToDestination(); + message.setJMSReplyTo(replyTo); + + // NOTE - we could consider adding a correlation ID per request so that we can ignore + // any cruft or old messages that are sent onto our inbound queue. + // + // Though that does mean that we must then rely on the inbound message having + // the right correlationID. Though at least this strategy would mean + // that we could have a single consumer on a temporary queue for all threads + // and use correlation IDs to dispatch to the corrent thread + // + // Maybe this should be a configurable strategy + + producer = borrowMessageProducer(session, destination); + + MessageConsumer consumer = getReplyToConsumer(); + if (isTopic(producer)) { + ((TopicPublisher) producer).publish((Topic) destination, message); + } + else { + ((QueueSender) producer).send((Queue) destination, message); + } + Message response = consumer.receive(timeoutMillis); + if (response == null) { + // we could have timed out so lets trash the temporary destination + // so that the next call() method will use a new destination to avoid + // the response for this call() coming back on later call() invokcations + clearReplyToDestination(); + } + return response; + } + finally { + returnMessageProducer(producer); + returnSession(session); + } + } + // Implementation methods //------------------------------------------------------------------------- @@ -163,8 +256,69 @@ return getMessengerSession().getListenerSession(); } + /** @return a message producer for the given session and destination */ + protected MessageProducer borrowMessageProducer( + Session session, + Destination destination) + throws JMSException { + + if (isCacheProducers()) { + return getMessengerSession().getMessageProducer(destination); + } + else { + return createMessageProducer(session, destination); + } + } + + protected void returnMessageProducer(MessageProducer producer) + throws JMSException { + if (!isCacheProducers()) { + producer.close(); + } + } + protected void returnListenerSession(Session session) throws JMSException { } + + /** + * @return the MessageConsumer for this threads temporary destination + * which is cached for the duration of this process. + */ + protected MessageConsumer getReplyToConsumer() throws JMSException { + MessengerSession messengerSession = getMessengerSession(); + MessageConsumer consumer = messengerSession.getReplyToConsumer(); + if (consumer == null) { + consumer = + createMessageConsumer( + messengerSession.getSession(), + messengerSession.getReplyToDestination()); + messengerSession.setReplyToConsumer(consumer); + } + return consumer; + } + + /** + * Clears the temporary destination used to receive reply-to messages + * which will lazily force a new destination and consumer to be created next + * time a call() method is invoked. + */ + protected void clearReplyToDestination() throws JMSException { + MessengerSession messengerSession = getMessengerSession(); + + messengerSession.setReplyToDestination(null); + MessageConsumer consumer = messengerSession.getReplyToConsumer(); + if (consumer != null) { + messengerSession.setReplyToConsumer(null); + + // ensure that everything is nullified first before we close + // just in case an exception occurs + consumer.close(); + } + } + + protected Destination getReplyToDestination() throws JMSException { + return getMessengerSession().getReplyToDestination(); + } /** * @return the current thread's MessengerSession 1.36 +9 -153 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java Index: MessengerSupport.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v retrieving revision 1.35 retrieving revision 1.36 diff -u -r1.35 -r1.36 --- MessengerSupport.java 6 Jun 2003 10:51:08 -0000 1.35 +++ MessengerSupport.java 27 Aug 2003 16:26:38 -0000 1.36 @@ -154,10 +154,10 @@ try { producer = borrowMessageProducer(session, destination); if (isTopic(producer)) { - ((TopicPublisher) producer).publish(message); + ((TopicPublisher) producer).publish((Topic) destination, message); } else { - ((QueueSender) producer).send(message); + ((QueueSender) producer).send((Queue) destination, message); } } finally { @@ -166,93 +166,6 @@ } } - public Message call(Destination destination, Message message) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; - try { - Destination replyTo = getReplyToDestination(); - message.setJMSReplyTo(replyTo); - - // NOTE - we could consider adding a correlation ID per request so that we can ignore - // any cruft or old messages that are sent onto our inbound queue. - // - // Though that does mean that we must then rely on the inbound message having - // the right correlationID. Though at least this strategy would mean - // that we could have a single consumer on a temporary queue for all threads - // and use correlation IDs to dispatch to the corrent thread - // - // Maybe this should be a configurable strategy - - producer = borrowMessageProducer(session, destination); - MessageConsumer consumer = getReplyToConsumer(); - - if (isTopic(producer)) { - ((TopicPublisher) producer).publish(message); - } - else { - ((QueueSender) producer).send(message); - } - Message response = consumer.receive(); - if (response == null) { - // we could have timed out so lets trash the temporary destination - // so that the next call() method will use a new destination to avoid - // the response for this call() coming back on later call() invokcations - clearReplyToDestination(); - } - return response; - } - finally { - returnMessageProducer(producer); - returnSession(session); - } - } - - public Message call( - Destination destination, - Message message, - long timeoutMillis) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; - try { - Destination replyTo = getReplyToDestination(); - message.setJMSReplyTo(replyTo); - - // NOTE - we could consider adding a correlation ID per request so that we can ignore - // any cruft or old messages that are sent onto our inbound queue. - // - // Though that does mean that we must then rely on the inbound message having - // the right correlationID. Though at least this strategy would mean - // that we could have a single consumer on a temporary queue for all threads - // and use correlation IDs to dispatch to the corrent thread - // - // Maybe this should be a configurable strategy - - producer = borrowMessageProducer(session, destination); - - MessageConsumer consumer = getReplyToConsumer(); - if (isTopic(producer)) { - ((TopicPublisher) producer).publish(message); - } - else { - ((QueueSender) producer).send(message); - } - Message response = consumer.receive(timeoutMillis); - if (response == null) { - // we could have timed out so lets trash the temporary destination - // so that the next call() method will use a new destination to avoid - // the response for this call() coming back on later call() invokcations - clearReplyToDestination(); - } - return response; - } - finally { - returnMessageProducer(producer); - returnSession(session); - } - } - public Message receive(Destination destination) throws JMSException { Session session = borrowSession(); MessageConsumer consumer = null; @@ -758,6 +671,7 @@ producer = borrowMessageProducer(session, destination); if (isTopic(producer)) { ((TopicPublisher) producer).publish( + (Topic) destination, message, deliveryMode, priority, @@ -765,6 +679,7 @@ } else { ((QueueSender) producer).send( + (Queue) destination, message, deliveryMode, priority, @@ -947,32 +862,12 @@ protected abstract boolean isTopic(MessageProducer producer) throws JMSException; - /** - * @return the current thread's MessengerSession - */ - protected abstract MessengerSession getMessengerSession() - throws JMSException; - /** @return a message producer for the given session and destination */ - protected MessageProducer borrowMessageProducer( + protected abstract MessageProducer borrowMessageProducer( Session session, - Destination destination) - throws JMSException { + Destination destination) throws JMSException; - if (isCacheProducers()) { - return getMessengerSession().getMessageProducer(destination); - } - else { - return createMessageProducer(session, destination); - } - } - - protected void returnMessageProducer(MessageProducer producer) - throws JMSException { - if (!isCacheProducers()) { - producer.close(); - } - } + protected abstract void returnMessageProducer(MessageProducer producer) throws JMSException; /** @return a newly created message producer for the given session and destination */ protected MessageProducer createMessageProducer( @@ -997,41 +892,6 @@ return answer; } - /** - * @return the MessageConsumer for this threads temporary destination - * which is cached for the duration of this process. - */ - protected MessageConsumer getReplyToConsumer() throws JMSException { - MessengerSession messengerSession = getMessengerSession(); - MessageConsumer consumer = messengerSession.getReplyToConsumer(); - if (consumer == null) { - consumer = - createMessageConsumer( - messengerSession.getSession(), - messengerSession.getReplyToDestination()); - messengerSession.setReplyToConsumer(consumer); - } - return consumer; - } - - /** - * Clears the temporary destination used to receive reply-to messages - * which will lazily force a new destination and consumer to be created next - * time a call() method is invoked. - */ - protected void clearReplyToDestination() throws JMSException { - MessengerSession messengerSession = getMessengerSession(); - - messengerSession.setReplyToDestination(null); - MessageConsumer consumer = messengerSession.getReplyToConsumer(); - if (consumer != null) { - messengerSession.setReplyToConsumer(null); - - // ensure that everything is nullified first before we close - // just in case an exception occurs - consumer.close(); - } - } /** @return a MessageConsumer for the given session and destination */ protected MessageConsumer borrowMessageConsumer( @@ -1170,9 +1030,5 @@ throws JMSException { // XXXX: might want to cache return session.createTopic(subject); - } - - protected Destination getReplyToDestination() throws JMSException { - return getMessengerSession().getReplyToDestination(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]