Author: ritchiem
Date: Mon Apr 30 07:37:23 2007
New Revision: 533764

URL: http://svn.apache.org/viewvc?view=rev&rev=533764
Log:
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java 
client. 

Updated to allow the use of durable subscriptions but it will not be as clean 
as with the extensions.
Selectors are also now disabled.

Modified:
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=533764&r1=533763&r2=533764
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Apr 30 07:37:23 2007
@@ -209,6 +209,12 @@
 
     private final boolean _strictAMQP;
 
+    /** System property to enable strickt AMQP compliance */
+    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+    /** Strickt AMQP default */
+    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+    private final boolean _strictAMQPFATAL;
 
     /** System property to enable immediate message prefetching */
     public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
@@ -436,6 +442,7 @@
     {
 
         _strictAMQP = 
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, 
STRICT_AMQP_DEFAULT));
+        _strictAMQPFATAL = 
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, 
STRICT_AMQP_FATAL_DEFAULT));
         _immediatePrefetch = 
Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, 
IMMEDIATE_PREFETCH_DEFAULT));
 
         _connection = con;
@@ -924,7 +931,7 @@
                                                                                
             getProtocolMajorVersion(),
                                                                                
             getProtocolMinorVersion(),
                                                                                
             false));    // requeue
-                _logger.warn("Session Recover cannot be guaranteed with 
STRICT_AMQP. Messages may arrive out of order.");                
+                _logger.warn("Session Recover cannot be guaranteed with 
STRICT_AMQP. Messages may arrive out of order.");
             }
             else
             {
@@ -1212,13 +1219,30 @@
                                                  final int prefetchLow,
                                                  final boolean noLocal,
                                                  final boolean exclusive,
-                                                 final String selector,
+                                                 String selector,
                                                  final FieldTable rawSelector,
                                                  final boolean noConsume,
                                                  final boolean autoClose) 
throws JMSException
     {
         checkTemporaryDestination(destination);
 
+        final String messageSelector;
+
+        if (_strictAMQP && !(selector == null || selector.equals("")))
+        {
+            if (_strictAMQPFATAL)
+            {
+                throw new UnsupportedOperationException("Selectors not 
currently supported by AMQP.");
+            }
+            else
+            {
+                messageSelector = null;
+            }
+        }
+        else
+        {
+            messageSelector = selector;
+        }
 
         return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
         {
@@ -1229,6 +1253,7 @@
                 AMQDestination amqd = (AMQDestination) destination;
 
                 final AMQProtocolHandler protocolHandler = 
getProtocolHandler();
+                // TODO: Define selectors in AMQP
                 // TODO: construct the rawSelector from the selector string if 
rawSelector == null
                 final FieldTable ft = FieldTableFactory.newFieldTable();
                 //if (rawSelector != null)
@@ -1237,7 +1262,8 @@
                 {
                     ft.addAll(rawSelector);
                 }
-                BasicMessageConsumer consumer = new 
BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
+
+                BasicMessageConsumer consumer = new 
BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
                                                                          
_messageFactoryRegistry, AMQSession.this,
                                                                          
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
                                                                          
_acknowledgeMode, noConsume, autoClose);
@@ -1630,6 +1656,8 @@
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
     {
+
+
         checkNotClosed();
         AMQTopic origTopic = checkValidTopic(topic);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, 
_connection);
@@ -1657,13 +1685,31 @@
             {
                 topicName = new AMQShortString(topic.getTopicName());
             }
-            // if the queue is bound to the exchange but NOT for this topic, 
then the JMS spec
-            // says we must trash the subscription.
-            if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
-                !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), 
topicName))
+
+            if (_strictAMQP)
             {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not 
currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already 
exists for '" + topicName + "' "
+                                 + "for creation durableSubscriber. Requesting 
queue deletion regardless.");
+                }
+
                 deleteQueue(dest.getAMQQueueName());
             }
+            else
+            {
+                // if the queue is bound to the exchange but NOT for this 
topic, then the JMS spec
+                // says we must trash the subscription.
+                if (isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName()) &&
+                    !isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName(), topicName))
+                {
+                    deleteQueue(dest.getAMQQueueName());
+                }
+            }
         }
 
         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) 
createConsumer(dest));
@@ -1761,13 +1807,31 @@
         }
         else
         {
-            if (isQueueBound(getDefaultTopicExchangeName(), 
AMQTopic.getDurableTopicQueueName(name, _connection)))
+            if (_strictAMQP)
             {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not 
currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already 
exists for '" + name + "' for unsubscribe."
+                                 + " Requesting queue deletion regardless.");
+                }
+
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, 
_connection));
             }
             else
             {
-                throw new InvalidDestinationException("Unknown subscription 
exchange:" + name);
+
+                if (isQueueBound(getDefaultTopicExchangeName(), 
AMQTopic.getDurableTopicQueueName(name, _connection)))
+                {
+                    deleteQueue(AMQTopic.getDurableTopicQueueName(name, 
_connection));
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Unknown 
subscription exchange:" + name);
+                }
             }
         }
     }
@@ -1779,10 +1843,6 @@
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString 
queueName, AMQShortString routingKey) throws JMSException
     {
-        if (isStrictAMQP())
-        {
-            throw new UnsupportedOperationException();
-        }
 
         // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=533764&r1=533763&r2=533764
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
 Mon Apr 30 07:37:23 2007
@@ -9,119 +9,141 @@
 import javax.jms.QueueSender;
 import javax.jms.InvalidDestinationException;
 
-public class QueueSenderAdapter implements QueueSender {
+public class QueueSenderAdapter implements QueueSender
+{
 
-       private BasicMessageProducer _delegate;
-       private Queue _queue;
-       private boolean closed = false;
-       
-       public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue 
queue){
-               _delegate = msgProducer;
-               _queue = queue;
-       }
-       
-       public Queue getQueue() throws JMSException {
-               checkPreConditions();
-               return _queue;
-       }
-
-       public void send(Message msg) throws JMSException {
-               checkPreConditions();
-               _delegate.send(msg);
-       }
-
-       public void send(Queue queue, Message msg) throws JMSException {
-               checkPreConditions(queue);
-               _delegate.send(queue, msg);
-       }
-
-       public void publish(Message msg, int deliveryMode, int priority, long 
timeToLive)
-       throws JMSException {
-               checkPreConditions();
-               _delegate.send(msg, deliveryMode,priority,timeToLive);
-       }
-
-       public void send(Queue queue,Message msg, int deliveryMode, int 
priority, long timeToLive)
-                       throws JMSException {
-               checkPreConditions(queue);
-               _delegate.send(queue,msg, deliveryMode,priority,timeToLive);
-       }
-       
-       public void close() throws JMSException {
-               _delegate.close();
-               closed = true;
-       }
-
-       public int getDeliveryMode() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDeliveryMode();
-       }
-
-       public Destination getDestination() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDestination();
-       }
-
-       public boolean getDisableMessageID() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDisableMessageID();
-       }
-
-       public boolean getDisableMessageTimestamp() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDisableMessageTimestamp();
-       }
-
-       public int getPriority() throws JMSException {
-               checkPreConditions();
-               return _delegate.getPriority();
-       }
-
-       public long getTimeToLive() throws JMSException {
-               checkPreConditions();
-               return _delegate.getTimeToLive();
-       }
-
-       public void send(Destination dest, Message msg) throws JMSException {
-               checkPreConditions((Queue)dest);
-               _delegate.send(dest,msg);
-       }
-
-       public void send(Message msg, int deliveryMode, int priority, long 
timeToLive)
-                       throws JMSException {
-               checkPreConditions();
-               _delegate.send(msg, deliveryMode,priority,timeToLive);
-       }
-
-       public void send(Destination dest, Message msg, int deliveryMode, int 
priority, long timeToLive) throws JMSException {
-               checkPreConditions((Queue)dest);
-               _delegate.send(dest,msg, deliveryMode,priority,timeToLive);
-       }
-
-       public void setDeliveryMode(int deliveryMode) throws JMSException {
-               checkPreConditions();
-               _delegate.setDeliveryMode(deliveryMode);
-       }
-
-       public void setDisableMessageID(boolean disableMessageID) throws 
JMSException {
-               checkPreConditions();
-               _delegate.setDisableMessageID(disableMessageID);
-       }
-
-       public void setDisableMessageTimestamp(boolean disableMessageTimestamp) 
throws JMSException {
-               checkPreConditions();
-               _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
-       }
-
-       public void setPriority(int priority) throws JMSException {
-               checkPreConditions();
-               _delegate.setPriority(priority);
-       }
-
-       public void setTimeToLive(long timeToLive) throws JMSException {
-               checkPreConditions();
-               _delegate.setTimeToLive(timeToLive);
-       }
+    private BasicMessageProducer _delegate;
+    private Queue _queue;
+    private boolean closed = false;
+
+    public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue)
+    {
+        _delegate = msgProducer;
+        _queue = queue;
+    }
+
+    public Queue getQueue() throws JMSException
+    {
+        checkPreConditions();
+        return _queue;
+    }
+
+    public void send(Message msg) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg);
+    }
+
+    public void send(Queue queue, Message msg) throws JMSException
+    {
+        checkPreConditions(queue);
+        _delegate.send(queue, msg);
+    }
+
+    public void publish(Message msg, int deliveryMode, int priority, long 
timeToLive)
+            throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void send(Queue queue, Message msg, int deliveryMode, int priority, 
long timeToLive)
+            throws JMSException
+    {
+        checkPreConditions(queue);
+        _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void close() throws JMSException
+    {
+        _delegate.close();
+        closed = true;
+    }
+
+    public int getDeliveryMode() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDeliveryMode();
+    }
+
+    public Destination getDestination() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDestination();
+    }
+
+    public boolean getDisableMessageID() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDisableMessageID();
+    }
+
+    public boolean getDisableMessageTimestamp() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDisableMessageTimestamp();
+    }
+
+    public int getPriority() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getPriority();
+    }
+
+    public long getTimeToLive() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getTimeToLive();
+    }
+
+    public void send(Destination dest, Message msg) throws JMSException
+    {
+        checkPreConditions((Queue) dest);
+        _delegate.send(dest, msg);
+    }
+
+    public void send(Message msg, int deliveryMode, int priority, long 
timeToLive)
+            throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void send(Destination dest, Message msg, int deliveryMode, int 
priority, long timeToLive) throws JMSException
+    {
+        checkPreConditions((Queue) dest);
+        _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void setDeliveryMode(int deliveryMode) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setDeliveryMode(deliveryMode);
+    }
+
+    public void setDisableMessageID(boolean disableMessageID) throws 
JMSException
+    {
+        checkPreConditions();
+        _delegate.setDisableMessageID(disableMessageID);
+    }
+
+    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) 
throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+    }
+
+    public void setPriority(int priority) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setPriority(priority);
+    }
+
+    public void setTimeToLive(long timeToLive) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setTimeToLive(timeToLive);
+    }
 
     private void checkPreConditions() throws JMSException
     {
@@ -130,31 +152,41 @@
 
     private void checkPreConditions(Queue queue) throws JMSException
     {
-               if (closed){
-                       throw new javax.jms.IllegalStateException("Publisher is 
closed");
-               }
-               
-               AMQSession session = ((BasicMessageProducer) 
_delegate).getSession();
-               
-               if(session == null || session.isClosed()){
-                       throw new javax.jms.IllegalStateException("Invalid 
Session");
-               }
+        if (closed)
+        {
+            throw new javax.jms.IllegalStateException("Publisher is closed");
+        }
+
+        AMQSession session = ((BasicMessageProducer) _delegate).getSession();
+
+        if (session == null || session.isClosed())
+        {
+            throw new javax.jms.IllegalStateException("Invalid Session");
+        }
 
-        if(!(queue instanceof AMQDestination))
+        if (!(queue instanceof AMQDestination))
         {
             throw new InvalidDestinationException("Queue: " + queue + " is not 
a valid Qpid queue");
         }
         AMQDestination destination = (AMQDestination) queue;
-        if(!destination.isValidated() && checkQueueBeforePublish())
+        if (!destination.isValidated() && checkQueueBeforePublish())
         {
 
-            if (_delegate.isBound(destination))
+            if (_delegate.getSession().isStrictAMQP())
             {
+                _delegate._logger.warn("AMQP does not support destination 
validation before publish, ");
                 destination.setValidated(true);
             }
             else
             {
-                throw new InvalidDestinationException("Queue: " + queue + " is 
not a valid destination (no bindings on server");
+                if (_delegate.isBound(destination))
+                {
+                    destination.setValidated(true);
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Queue: " + queue + 
" is not a valid destination (no bindings on server");
+                }
             }
         }
     }


Reply via email to