Author: marnie
Date: Fri Nov  3 02:02:56 2006
New Revision: 470742

URL: http://svn.apache.org/viewvc?view=rev&rev=470742
Log:
Changes to fix QueueReceiver ClassCastException being thrown by methods in 
AMQSession which should return this interface. As per QPID-58. Added 
QueueReceiverAdaptor class now used in AMQSession.

Added:
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java?view=diff&rev=470742&r1=470741&r2=470742
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
 Fri Nov  3 02:02:56 2006
@@ -169,7 +169,7 @@
                     String reason = message.bounceBody.replyText;
                     _logger.debug("Message returned with error code " + 
errorCode + " (" + reason + ")");
 
-                    //Todo should this be moved to an exception handler of 
sorts. Somewhere errors are converted to correct execeptions.
+                    //@TODO should this be moved to an exception handler of 
sorts. Somewhere errors are converted to correct execeptions.
                     if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
                     {
                         _connection.exceptionReceived(new 
AMQNoConsumersException("Error: " + reason, bouncedMessage));
@@ -380,7 +380,7 @@
             }
             catch (AMQException e)
             {
-                throw new JMSException("Unable to create message: " + e);
+                throw new JMSException("Unable to create text message: " + e);
             }
         }
     }
@@ -398,7 +398,7 @@
             }
             catch (AMQException e)
             {
-                throw new JMSException("Unable to create message: " + e);
+                throw new JMSException("Unable to create text message: " + e);
             }
         }
     }
@@ -718,6 +718,34 @@
         }.execute(_connection);
     }
 
+    /**
+     * Creates a QueueReceiver
+     * @param destination
+     * @return QueueReceiver - a wrapper around our MessageConsumer
+     * @throws JMSException
+     */
+    public QueueReceiver createQueueReceiver(Destination destination) throws 
JMSException
+    {
+        AMQQueue dest = (AMQQueue) destination;
+        BasicMessageConsumer consumer = (BasicMessageConsumer) 
createConsumer(destination);
+        return new QueueReceiverAdaptor(dest, consumer);
+    }
+
+    /**
+     * Creates a QueueReceiver using a message selector
+     * @param destination
+     * @param messageSelector
+     * @return QueueReceiver - a wrapper around our MessageConsumer
+     * @throws JMSException
+     */
+    public QueueReceiver createQueueReceiver(Destination destination, String 
messageSelector) throws JMSException
+    {
+        AMQQueue dest = (AMQQueue) destination;
+        BasicMessageConsumer consumer = (BasicMessageConsumer)
+                createConsumer(destination, messageSelector);
+        return new QueueReceiverAdaptor(dest, consumer);
+    }
+
     public MessageConsumer createConsumer(Destination destination) throws 
JMSException
     {
         return createConsumer(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, false, false, null);
@@ -924,14 +952,32 @@
         }
     }
 
+    /**
+     * Creates a QueueReceiver wrapping a MessageConsumer
+     * @param queue
+     * @return QueueReceiver
+     * @throws JMSException
+     */
     public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
-        return (QueueReceiver) createConsumer(queue);
+        AMQQueue dest = (AMQQueue) queue;
+        BasicMessageConsumer consumer = (BasicMessageConsumer) 
createConsumer(dest);
+        return new QueueReceiverAdaptor(dest, consumer);
     }
 
+    /**
+     * Creates a QueueReceiver wrapping a MessageConsumer using a message 
selector
+     * @param queue
+     * @param messageSelector
+     * @return QueueReceiver
+     * @throws JMSException
+     */
     public QueueReceiver createReceiver(Queue queue, String messageSelector) 
throws JMSException
     {
-        return (QueueReceiver) createConsumer(queue, messageSelector);
+        AMQQueue dest = (AMQQueue) queue;
+        BasicMessageConsumer consumer = (BasicMessageConsumer)
+                createConsumer(dest, messageSelector);
+        return new QueueReceiverAdaptor(dest, consumer);
     }
 
     public QueueSender createSender(Queue queue) throws JMSException
@@ -961,14 +1007,30 @@
         }
     }
 
+    /**
+     * Creates a non-durable subscriber
+     * @param topic
+     * @return TopicSubscriber - a wrapper round our MessageConsumer
+     * @throws JMSException
+     */
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
-        return (TopicSubscriber) createConsumer(topic);
+        AMQTopic dest = new AMQTopic(topic.getTopicName());
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) 
createConsumer(dest));
     }
 
+    /**
+     * Creates a non-durable subscriber with a message selector
+     * @param topic
+     * @param messageSelector
+     * @param noLocal
+     * @return TopicSubscriber - a wrapper round our MessageConsumer
+     * @throws JMSException
+     */
     public TopicSubscriber createSubscriber(Topic topic, String 
messageSelector, boolean noLocal) throws JMSException
     {
-        return (TopicSubscriber) createConsumer(topic, messageSelector, 
noLocal);
+        AMQTopic dest = new AMQTopic(topic.getTopicName());
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) 
createConsumer(dest, messageSelector, noLocal));
     }
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=470742&r1=470741&r2=470742
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
 Fri Nov  3 02:02:56 2006
@@ -129,7 +129,7 @@
      */
     private boolean _dups_ok_acknowledge_send;
 
-    BasicMessageConsumer(int channelId, AMQConnection connection, 
AMQDestination destination, String messageSelector,
+    protected BasicMessageConsumer(int channelId, AMQConnection connection, 
AMQDestination destination, String messageSelector,
                          boolean noLocal, MessageFactoryRegistry 
messageFactory, AMQSession session,
                          AMQProtocolHandler protocolHandler, FieldTable 
rawSelectorFieldTable,
                          int prefetchHigh, int prefetchLow, boolean exclusive, 
int acknowledgeMode)

Added: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java?view=auto&rev=470742
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java
 Fri Nov  3 02:02:56 2006
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.*;
+
+/**
+ * Class that wraps a MessageConsumer for backwards JMS compatibility
+ * Returned by methods in AMQSession etc
+ */
+public class QueueReceiverAdaptor implements QueueReceiver {
+
+    protected MessageConsumer _consumer;
+    protected Queue _queue;
+
+    protected QueueReceiverAdaptor(Queue queue, MessageConsumer consumer)
+    {
+        _consumer = consumer;
+        _queue = queue;
+    }
+
+    public String getMessageSelector() throws JMSException
+    {
+        return _consumer.getMessageSelector();
+    }
+
+    public MessageListener getMessageListener() throws JMSException
+    {
+        return _consumer.getMessageListener();
+    }
+
+    public void setMessageListener(MessageListener messageListener) throws 
JMSException
+    {
+       _consumer.setMessageListener(messageListener);
+    }
+
+    public Message receive() throws JMSException
+    {
+        return _consumer.receive();
+    }
+
+    public Message receive(long l) throws JMSException
+    {
+        return _consumer.receive(l);
+    }
+
+    public Message receiveNoWait() throws JMSException
+    {
+        return _consumer.receiveNoWait();
+    }
+
+    public void close() throws JMSException
+    {
+        _consumer.close();
+    }
+
+    /**
+     * Return the queue associated with this receiver
+     * @return
+     * @throws JMSException
+     */
+    public Queue getQueue() throws JMSException
+    {
+       return _queue;
+    }
+
+
+}


Reply via email to