Author: gsim
Date: Mon Jan 29 10:53:27 2007
New Revision: 501144

URL: http://svn.apache.org/viewvc?view=rev&rev=501144
Log:
Moved across fixes from trunk for handling exclusive consumers and no_local 
consumption.
Fixed close process in AMQChannel (remove channel from map only after consumer 
cancellations have been processed).



Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 Mon Jan 29 10:53:27 2007
@@ -105,6 +105,21 @@
                     
session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
                         "Non-unique consumer tag, '" + body.destination + "'", 
body.getClazz(), body.getMethod());
                 }
+                catch (AMQQueue.ExistingExclusiveSubscription e)
+                {
+                    throw 
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+                                                  "Cannot subscribe to queue "
+                                                          + queue.getName()
+                                                          + " as it already 
has an existing exclusive consumer");
+                }
+                catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+                {
+                    throw 
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+                                                   "Cannot subscribe to queue "
+                                                   + queue.getName()
+                                                   + " exclusively as it 
already has a consumer");
+                }
+
             }
         }
     }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
 Mon Jan 29 10:53:27 2007
@@ -24,6 +24,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -61,6 +62,8 @@
         } else {
             channel.resend(protocolSession);
         }
+        MessageOkBody response = 
MessageOkBody.createMethodBody(protocolSession.getMajor(), 
protocolSession.getMinor());
+        protocolSession.writeResponse(evt, response);
     }
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Mon Jan 29 10:53:27 2007
@@ -316,9 +316,10 @@
         {
             _logger.error("Closing channel due to: " + e.getMessage());
             writeRequest(channelNum, e.getCloseMethodBody());
-            AMQChannel channel = _channelMap.remove(channelNum);
+            AMQChannel channel = _channelMap.get(channelNum);//can't remove it 
yet as close requires it
             if (channel != null) {
                 channel.close(this);
+                _channelMap.remove(channelNum);
             }
         }
         catch (AMQConnectionException e)
@@ -726,6 +727,11 @@
     public int getConnectionId()
     {
         return _ConnectionId.get();
+    }
+
+    public Object getClientIdentifier()
+    {
+        return _minaProtocolSession.getRemoteAddress();    
     }
 
     public void addSessionCloseTask(Task task)

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 Mon Jan 29 10:53:27 2007
@@ -159,6 +159,8 @@
     void checkMethodBodyVersion(AMQMethodBody methodBody);
     int getConnectionId();
 
+    Object getClientIdentifier();
+
     void addSessionCloseTask(Task task);
 
     void removeSessionCloseTask(Task task);

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Mon Jan 29 10:53:27 2007
@@ -72,6 +72,7 @@
     private final boolean _isBrowser;
     private final Boolean _autoClose;
     private boolean _closed = false;
+    private static final String CLIENT_PROPERTIES_INSTANCE = 
ClientProperties.instance.toString();
 
     public static class Factory implements SubscriptionFactory
     {
@@ -331,35 +332,49 @@
     {
         if (_noLocal)
         {
+            boolean isLocal;
             // We don't want local messages so check to see if message is one 
we sent
-            if 
(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
-                    
msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+            Object localInstance;
+            Object msgInstance;
+
+            if((protocolSession.getClientProperties() != null) &&
+                 (localInstance = 
protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != 
null)
             {
-                if (_logger.isTraceEnabled())
+                if((msg.getPublisher().getClientProperties() != null) &&
+                     (msgInstance = 
msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) 
!= null)
                 {
-                    _logger.trace("(" + System.identityHashCode(this) + ") has 
no interest as it is a local message(" +
-                                  System.identityHashCode(msg) + ")");
+                    if (localInstance == msgInstance || ((localInstance != 
null) && localInstance.equals(msgInstance)))
+                    {
+                        if (_logger.isTraceEnabled())
+                        {
+                            _logger.trace("(" + System.identityHashCode(this) 
+ ") has no interest as it is a local message(" +
+                                          System.identityHashCode(msg) + ")");
+                        }
+                        return false;
+                    }
                 }
-                return false;
             }
-            else // if not then filter the message.
+            else
             {
-                if (_logger.isTraceEnabled())
+                localInstance = protocolSession.getClientIdentifier();
+                msgInstance = msg.getPublisher().getClientIdentifier();
+                if (localInstance == msgInstance || ((localInstance != null) 
&& localInstance.equals(msgInstance)))
                 {
-                    _logger.trace("(" + System.identityHashCode(this) + ") 
local message(" + System.identityHashCode(msg) +
-                                  ") but not ours so filtering");
+                    if (_logger.isTraceEnabled())
+                    {
+                        _logger.trace("(" + System.identityHashCode(this) + ") 
has no interest as it is a local message(" +
+                                      System.identityHashCode(msg) + ")");
+                    }
+                    return false;
                 }
-                return checkFilters(msg);
+
             }
         }
-        else
+        if (_logger.isTraceEnabled())
         {
-            if (_logger.isTraceEnabled())
-            {
-                _logger.trace("(" + System.identityHashCode(this) + ") 
checking filters for message (" + System.identityHashCode(msg));
-            }
-            return checkFilters(msg);
+            _logger.trace("(" + System.identityHashCode(this) + ") checking 
filters for message (" + System.identityHashCode(msg));
         }
+        return checkFilters(msg);
     }
 
     private boolean checkFilters(AMQMessage msg)

Modified: 
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
 Mon Jan 29 10:53:27 2007
@@ -143,6 +143,12 @@
     {
     }
 
+       
+       public Object getClientIdentifier()
+       {
+               return null;
+       }
+
        public void closeChannelRequest(int channelId, int replyCode, String 
replyText) throws AMQException {
                // TODO Auto-generated method stub
                


Reply via email to