Author: ritchiem
Date: Thu Apr 26 08:59:24 2007
New Revision: 532786

URL: http://svn.apache.org/viewvc?view=rev&rev=532786
Log:
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java 
client. 

This disables the JMS features that rely upon Qpid Java broker specific 
features.

Modified:
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=532786&r1=532785&r2=532786
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Apr 26 08:59:24 2007
@@ -202,11 +202,20 @@
     /** Boolean to control immediate prefetch . Records the first call to the 
dispatcher to prevent further flow(true) */
     private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
+    /** System property to enable strickt AMQP compliance */
+    public static final String STRICT_AMQP = "STRICT_AMQP";
+    /** Strickt AMQP default */
+    public static final String STRICT_AMQP_DEFAULT = "false";
+
+    private final boolean _strictAMQP;
+
+
     /** System property to enable immediate message prefetching */
     public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
     /** Immediate message prefetch default */
     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
+    private final boolean _immediatePrefetch;
 
     private static final Logger _dispatcherLogger = 
Logger.getLogger(Dispatcher.class);
 
@@ -435,6 +444,10 @@
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int 
defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
+
+        _strictAMQP = 
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, 
STRICT_AMQP_DEFAULT));
+        _immediatePrefetch = 
Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, 
IMMEDIATE_PREFETCH_DEFAULT));
+
         _connection = con;
         _transacted = transacted;
         if (transacted)
@@ -921,15 +934,27 @@
                 _dispatcher.rollback();
             }
 
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            
_connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                               
        getProtocolMajorVersion(),
-                                                                               
        getProtocolMinorVersion(),
-                                                                               
        false)    // requeue
-                    , BasicRecoverOkBody.class);
+            if (isStrictAMQP())
+            {
+                // We can't use the BasicRecoverBody-OK method as it isn't 
part of the spec.
+                
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                               
             getProtocolMajorVersion(),
+                                                                               
             getProtocolMinorVersion(),
+                                                                               
             false));    // requeue
+                _logger.warn("Session Recover cannot be guaranteed with 
STRICT_AMQP. Messages may arrive out of order.");                
+            }
+            else
+            {
 
+                // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
+                // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions 
change.
+                
_connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                               
            getProtocolMajorVersion(),
+                                                                               
            getProtocolMinorVersion(),
+                                                                               
            false)    // requeue
+                        , BasicRecoverOkBody.class);
+            }
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -1433,7 +1458,6 @@
     private void consumeFromQueue(BasicMessageConsumer consumer, 
AMQShortString queueName, AMQProtocolHandler protocolHandler,
                                   boolean nowait, String messageSelector) 
throws AMQException
     {
-        //fixme prefetch values are not used here. Do we need to have them as 
parametsrs?
         //need to generate a consumer tag on the client so we can exploit the 
nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
 
@@ -1709,11 +1733,21 @@
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         return createBrowser(queue, null);
     }
 
     public QueueBrowser createBrowser(Queue queue, String messageSelector) 
throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         checkNotClosed();
         checkValidQueue(queue);
         return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
@@ -1762,6 +1796,11 @@
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString 
queueName, AMQShortString routingKey) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
                                                                
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
@@ -1940,7 +1979,7 @@
     synchronized void startDistpatcherIfNecessary()
     {
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching     
     
-        if 
(!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, 
IMMEDIATE_PREFETCH_DEFAULT)))
+        if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection
             if (isSuspended() && _startedAtLeastOnce.get() && 
_firstDispatcher.getAndSet(false))
@@ -2005,7 +2044,7 @@
         bindQueue(amqd, queueName, protocolHandler, 
consumer.getRawSelectorFieldTable());
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to 
delay prefetch
-        if 
(!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, 
IMMEDIATE_PREFETCH_DEFAULT)))
+        if (!_immediatePrefetch)
         {
             // The dispatcher will be null if we have just created this session
             // so suspend the channel before we register our consumer so that 
we don't
@@ -2390,6 +2429,11 @@
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
+    }
+
+    public boolean isStrictAMQP()
+    {
+        return _strictAMQP;
     }
 
 }


Reply via email to