Author: ritchiem
Date: Tue Dec 19 08:07:12 2006
New Revision: 488712

URL: http://svn.apache.org/viewvc?view=rev&rev=488712
Log:
QPID-216
BasicConsumeMethodHandler.java - Pulled the nolocal param from the method body 
and passed down channel to subscription.
SubscriptionFactory.java / AMQQueue.java/AMQChannel.java - passed the nolocal 
parameter through to the Subscription
ConnectionStartOkMethodHandler.java - Saved the client properties so the client 
identifier can be used in comparison with the publisher id to implement no_local
AMQMinaProtocolSession.java - added _clientProperties to store the sent client 
properties.
AMQProtocolSession.java - interface changes to get/set ClientProperties
ConcurrentSelectorDeliveryManager.java - only need to do hasInterset as this 
will take care of the hasFilters optimisation check.
SubscriptionImpl.java - Added code to do comparison of client ids to determin 
insterest in a given message.
SubscriptionSet.java - tidied up code to use hasInterest as this is where the 
nolocal is implemented.

ConnectionStartMethodHandler.java - Moved literal values to a 
ClientProperties.java enumeration and a QpidProperties.java values.
QpidConnectionMetaData.java - updated to get values from QpidProperties.java

MockProtocolSession.java - null implementation of new get/set methods

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
   (with props)
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Tue Dec 19 08:07:12 2006
@@ -286,12 +286,14 @@
      * @param tag     the tag chosen by the client (if null, server will 
generate one)
      * @param queue   the queue to subscribe to
      * @param session the protocol session of the subscriber
+     * @param noLocal
      * @return the consumer tag. This is returned to the subscriber and used in
      *         subsequent unsubscribe requests
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
-    public String subscribeToQueue(String tag, AMQQueue queue, 
AMQProtocolSession session, boolean acks, FieldTable filters) throws 
AMQException, ConsumerTagNotUniqueException
+    public String subscribeToQueue(String tag, AMQQueue queue, 
AMQProtocolSession session, boolean acks,
+                                   FieldTable filters, boolean noLocal) throws 
AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -302,7 +304,7 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks, filters);
+        queue.registerProtocolSession(session, _channelId, tag, acks, 
filters,noLocal);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;
     }
@@ -499,7 +501,7 @@
         if (_log.isDebugEnabled())
         {
             _log.debug("Handling acknowledgement for channel " + _channelId + 
" with delivery tag " + deliveryTag +
-                      " and multiple " + multiple);
+                       " and multiple " + multiple);
         }
         if (multiple)
         {

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 Tue Dec 19 08:07:12 2006
@@ -77,7 +77,8 @@
             }
             try
             {
-                String consumerTag = 
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, 
body.arguments);
+                String consumerTag = 
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+                                                              body.arguments, 
body.noLocal);
                 if (!body.nowait)
                 {
                     
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
@@ -90,8 +91,8 @@
             {
                 _log.info("Closing connection due to invalid selector");
                 session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, 
AMQConstant.INVALID_SELECTOR.getCode(),
-                                                                      
ise.getMessage(), BasicConsumeBody.CLASS_ID,
-                                                                      
BasicConsumeBody.METHOD_ID));
+                                                                   
ise.getMessage(), BasicConsumeBody.CLASS_ID,
+                                                                   
BasicConsumeBody.METHOD_ID));
             }
             catch (ConsumerTagNotUniqueException e)
             {

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
 Tue Dec 19 08:07:12 2006
@@ -78,12 +78,19 @@
 
             AuthenticationResult authResult = authMgr.authenticate(ss, 
body.response);
 
+            //save clientProperties
+            if (protocolSession.getClientProperties() == null)
+            {
+                protocolSession.setClientProperties(body.clientProperties);
+            }
+
             switch (authResult.status)
             {
                 case ERROR:
                     throw new AMQException("Authentication failed");
                 case SUCCESS:
                     _logger.info("Connected as: " + ss.getAuthorizationID());
+
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
                     AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, 
Integer.MAX_VALUE, getConfiguredFrameSize(),
                                                                       
HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@
     static int getConfiguredFrameSize()
     {
         final Configuration config = 
ApplicationRegistry.getInstance().getConfiguration();
-        final int framesize =  config.getInt("advanced.framesize", 
DEFAULT_FRAME_SIZE);
+        final int framesize = config.getInt("advanced.framesize", 
DEFAULT_FRAME_SIZE);
         _logger.info("Framesize set to " + framesize);
         return framesize;
     }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Tue Dec 19 08:07:12 2006
@@ -26,17 +26,19 @@
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionStartBody;
 import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
+
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -89,10 +91,11 @@
     private boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
-    
+
     /* AMQP Version for this session */
     private byte _major;
     private byte _minor;
+    private FieldTable _clientProperties;
 
     public ManagedObject getManagedObject()
     {
@@ -128,7 +131,7 @@
         {
             return new AMQProtocolSessionMBean(this);
         }
-        catch(JMException ex)
+        catch (JMException ex)
         {
             _logger.error("AMQProtocolSession MBean creation has failed ", ex);
             throw new AMQException("AMQProtocolSession MBean creation has 
failed ", ex);
@@ -153,18 +156,21 @@
         {
             ProtocolInitiation pi = (ProtocolInitiation) message;
             // this ensures the codec never checks for a PI message again
-            
((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
-            try {
+            ((AMQDecoder) 
_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+            try
+            {
                 pi.checkVersion(this); // Fails if not correct
                 // This sets the protocol version (and hence framing classes) 
for this session.
                 _major = pi.protocolMajor;
                 _minor = pi.protocolMinor;
                 String mechanisms = 
ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
                 String locales = "en_US";
-                AMQFrame response = 
ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, 
pi.protocolMinor, null,
+                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 
0, pi.protocolMajor, pi.protocolMinor, null,
                                                                        
mechanisms.getBytes(), locales.getBytes());
                 _minaProtocolSession.write(response);
-            } catch (AMQException e) {
+            }
+            catch (AMQException e)
+            {
                 _logger.error("Received incorrect protocol initiation", e);
                 /* Find last protocol version in protocol version list. Make 
sure last protocol version
                 listed in the build file (build-module.xml) is the latest 
version which will be used
@@ -211,7 +217,7 @@
             _logger.debug("Method frame received: " + frame);
         }
         final AMQMethodEvent<AMQMethodBody> evt = new 
AMQMethodEvent<AMQMethodBody>(frame.channel,
-                                                                               
     (AMQMethodBody)frame.bodyFrame);
+                                                                               
     (AMQMethodBody) frame.bodyFrame);
         try
         {
             boolean wasAnyoneInterested = false;
@@ -266,7 +272,7 @@
         {
             _logger.debug("Content header frame received: " + frame);
         }
-        
getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+        getChannel(frame.channel).publishContentHeader((ContentHeaderBody) 
frame.bodyFrame);
     }
 
     private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -275,7 +281,7 @@
         {
             _logger.debug("Content body frame received: " + frame);
         }
-        
getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame);
+        getChannel(frame.channel).publishContentBody((ContentBody) 
frame.bodyFrame);
     }
 
     /**
@@ -355,6 +361,7 @@
      * Close a specific channel. This will remove any resources used by the 
channel, including:
      * <ul><li>any queue subscriptions (this may in turn remove queues if they 
are auto delete</li>
      * </ul>
+     *
      * @param channelId id of the channel to close
      * @throws AMQException if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
@@ -381,6 +388,7 @@
 
     /**
      * In our current implementation this is used by the clustering code.
+     *
      * @param channelId
      */
     public void removeChannel(int channelId)
@@ -390,11 +398,12 @@
 
     /**
      * Initialise heartbeats on the session.
+     *
      * @param delay delay in seconds (not ms)
      */
     public void initHeartbeats(int delay)
     {
-        if(delay > 0)
+        if (delay > 0)
         {
             _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
             _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, 
HeartbeatConfig.getInstance().getTimeout(delay));
@@ -404,6 +413,7 @@
     /**
      * Closes all channels that were opened by this protocol session. This 
frees up all resources
      * used by the channel.
+     *
      * @throws AMQException if an error occurs while closing any channel
      */
     private void closeAllChannels() throws AMQException
@@ -421,7 +431,7 @@
      */
     public void closeSession() throws AMQException
     {
-        if(!_closed)
+        if (!_closed)
         {
             _closed = true;
             closeAllChannels();
@@ -463,11 +473,11 @@
         // information is used by SASL primary.
         if (address instanceof InetSocketAddress)
         {
-            return ((InetSocketAddress)address).getHostName();
+            return ((InetSocketAddress) address).getHostName();
         }
         else if (address instanceof VmPipeAddress)
         {
-            return "vmpipe:" + ((VmPipeAddress)address).getPort();
+            return "vmpipe:" + ((VmPipeAddress) address).getPort();
         }
         else
         {
@@ -484,22 +494,32 @@
     {
         _saslServer = saslServer;
     }
-    
+
+    public FieldTable getClientProperties()
+    {
+        return _clientProperties;
+    }
+
+    public void setClientProperties(FieldTable clientProperties)
+    {
+        _clientProperties = clientProperties;
+    }
+
     /**
      * Convenience methods for managing AMQP version.
      * NOTE: Both major and minor will be set to 0 prior to protocol 
initiation.
      */
-    
+
     public byte getAmqpMajor()
     {
         return _major;
     }
-    
+
     public byte getAmqpMinor()
     {
         return _minor;
     }
-    
+
     public boolean amqpVersionEquals(byte major, byte minor)
     {
         return _major == major && _minor == minor;

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 Tue Dec 19 08:07:12 2006
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
 
@@ -122,4 +123,9 @@
      * @param saslServer
      */
     void setSaslServer(SaslServer saslServer);
+
+
+    FieldTable getClientProperties();
+
+    void setClientProperties(FieldTable clientProperties);
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Tue Dec 19 08:07:12 2006
@@ -96,7 +96,7 @@
      * max allowed number of messages on a queue.
      */
     private Integer _maxMessageCount = 10000;
-    
+
     /**
      * max queue depth(KB) for the queue
      */
@@ -362,12 +362,17 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag, boolean acks, FieldTable filters)
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag, boolean acks, FieldTable filters) throws AMQException
+    {
+        registerProtocolSession(ps, channel, consumerTag, acks, filters, 
false);
+    }
+
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
             throws AMQException
     {
         debug("Registering protocol session {0} with channel {1} and consumer 
tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = 
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, 
filters);
+        Subscription subscription = 
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, 
filters, noLocal);
         _subscribers.addSubscriber(subscription);
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Tue Dec 19 08:07:12 2006
@@ -281,7 +281,7 @@
                         }
 
                         // Only give the message to those that want them.
-                        if (sub.hasFilters() && sub.hasInterest(msg))
+                        if (sub.hasInterest(msg))
                         {
                             sub.enqueueForPreDelivery(msg);
                         }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
 Tue Dec 19 08:07:12 2006
@@ -33,12 +33,10 @@
  */
 public interface SubscriptionFactory
 {
-    Subscription createSubscription(int channel, AMQProtocolSession 
protocolSession, String consumerTag, boolean acks, FieldTable filters)
-        throws AMQException;
+    Subscription createSubscription(int channel, AMQProtocolSession 
protocolSession, String consumerTag, boolean acks,
+                                    FieldTable filters, boolean noLocal) 
throws AMQException;
 
-    Subscription createSubscription(int channel, AMQProtocolSession 
protocolSession, String consumerTag, boolean acks)
-        throws AMQException;
 
-    Subscription createSubscription(int channel, AMQProtocolSession 
protocolSession,String consumerTag)
-        throws AMQException;
+    Subscription createSubscription(int channel, AMQProtocolSession 
protocolSession, String consumerTag)
+            throws AMQException;
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Tue Dec 19 08:07:12 2006
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
@@ -56,6 +57,7 @@
 
     private Queue<AMQMessage> _messages;
 
+    private final boolean _noLocal;
 
     /**
      * True if messages need to be acknowledged
@@ -65,21 +67,15 @@
 
     public static class Factory implements SubscriptionFactory
     {
-        public Subscription createSubscription(int channel, AMQProtocolSession 
protocolSession, String consumerTag, boolean acks, FieldTable filters) throws 
AMQException
+        public Subscription createSubscription(int channel, AMQProtocolSession 
protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean 
noLocal) throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, 
acks, filters);
-        }
-
-        public SubscriptionImpl createSubscription(int channel, 
AMQProtocolSession protocolSession, String consumerTag, boolean acks)
-                throws AMQException
-        {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, 
acks, null);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, 
acks, filters, noLocal);
         }
 
         public SubscriptionImpl createSubscription(int channel, 
AMQProtocolSession protocolSession, String consumerTag)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, 
false, null);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, 
false, null, false);
         }
     }
 
@@ -87,11 +83,11 @@
                             String consumerTag, boolean acks)
             throws AMQException
     {
-        this(channelId, protocolSession, consumerTag, acks, null);
+        this(channelId, protocolSession, consumerTag, acks, null, false);
     }
 
     public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
-                            String consumerTag, boolean acks, FieldTable 
filters)
+                            String consumerTag, boolean acks, FieldTable 
filters, boolean noLocal)
             throws AMQException
     {
         AMQChannel channel = protocolSession.getChannel(channelId);
@@ -105,6 +101,8 @@
         this.consumerTag = consumerTag;
         sessionKey = protocolSession.getKey();
         _acks = acks;
+        _noLocal = noLocal;
+
         _filters = FilterManagerFactory.createManager(filters);
 
         if (_filters != null)
@@ -218,7 +216,22 @@
 
     public boolean hasInterest(AMQMessage msg)
     {
-        return _filters.allAllow(msg);
+        if (_noLocal)
+        {
+            return 
!(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+                    
msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())));
+        }
+        else
+        {
+            if (_filters != null)
+            {
+                return _filters.allAllow(msg);
+            }
+            else
+            {
+                return true;
+            }
+        }
     }
 
     public Queue<AMQMessage> getPreDeliveryQueue()
@@ -233,8 +246,6 @@
             _messages.offer(msg);
         }
     }
-
-
 
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String 
routingKey, String exchange)

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 Tue Dec 19 08:07:12 2006
@@ -139,22 +139,15 @@
 
             if (!subscription.isSuspended())
             {
-                if (!subscription.hasFilters())
+                if (subscription.hasInterest(msg))
                 {
-                    return subscription;
-                }
-                else
-                {
-                    if (subscription.hasInterest(msg))
+                    // if the queue is not empty then this client is ready to 
receive a message.
+                    //FIXME the queue could be full of sent messages.
+                    // Either need to clean all PDQs after sending a message
+                    // OR have a clean up thread that runs the PDQs expunging 
the messages.
+                    if (!subscription.hasFilters() || 
subscription.getPreDeliveryQueue().isEmpty())
                     {
-                        // if the queue is not empty then this client is ready 
to receive a message.
-                        //FIXME the queue could be full of sent messages.
-                        // Either need to clean all PDQs after sending a 
message
-                        // OR have a clean up thread that runs the PDQs 
expunging the messages.
-                        if (subscription.getPreDeliveryQueue().isEmpty())
-                        {
-                            return subscription;
-                        }
+                        return subscription;
                     }
                 }
             }
@@ -208,6 +201,7 @@
     /**
      * Notification that a queue has been deleted. This is called so that the 
subscription can inform the
      * channel, which in turn can update its list of unacknowledged messages.
+     *
      * @param queue
      */
     public void queueDeleted(AMQQueue queue)

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
 Tue Dec 19 08:07:12 2006
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.common.QpidProperties;
+
 import java.util.Enumeration;
 
 import javax.jms.ConnectionMetaData;
@@ -29,7 +31,6 @@
 {
 
 
-
     QpidConnectionMetaData(AMQConnection conn)
     {
     }
@@ -46,7 +47,7 @@
 
     public String getJMSProviderName() throws JMSException
     {
-        return "Apache Qpid";
+        return "Apache " + QpidProperties.getProductName();
     }
 
     public String getJMSVersion() throws JMSException
@@ -71,8 +72,8 @@
 
     public String getProviderVersion() throws JMSException
     {
-        return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + 
getBrokerVersion() + "] ; Protocol: [ "
-                + getProtocolVersion() + "] )";
+        return QpidProperties.getProductName() + " (Client: [" + 
getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+               + getProtocolVersion() + "] )";
     }
 
     private String getProtocolVersion()
@@ -89,8 +90,7 @@
 
     public String getClientVersion()
     {
-        // TODO - get client build version from properties file or similar
-        return "<unknown>";
+        return QpidProperties.getBuildVerision();
     }
 
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
 Tue Dec 19 08:07:12 2006
@@ -22,6 +22,8 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,10 +121,11 @@
 
             stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
             FieldTable clientProperties = FieldTableFactory.newFieldTable();
-            clientProperties.put("instance", ps.getClientID());
-            clientProperties.put("product", "Qpid");
-            clientProperties.put("version", "1.0");
-            clientProperties.put("platform", getFullSystemInfo());
+            
+            clientProperties.put(ClientProperties.instance.toString(), 
ps.getClientID());
+            clientProperties.put(ClientProperties.product.toString(), 
QpidProperties.getProductName());
+            clientProperties.put(ClientProperties.version.toString(), 
QpidProperties.getReleaseVerision());
+            clientProperties.put(ClientProperties.platform.toString(), 
getFullSystemInfo());
             
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), 
clientProperties, mechanism,
                                                                saslResponse, 
selectedLocale));
         }

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java?view=auto&rev=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
 Tue Dec 19 08:07:12 2006
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.common;
+
+public enum ClientProperties
+{
+    instance,
+    product,
+    version,
+    platform
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java?view=auto&rev=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
 Tue Dec 19 08:07:12 2006
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.common;
+
+public class QpidProperties
+{
+
+    static
+    {
+        //load values from property file.
+    }
+
+    public static String getProductName()
+    {
+        return "Qpid";
+    }
+
+    public static String getReleaseVerision()
+    {
+        return "1.0";
+    }
+
+
+    public static String getBuildVerision()
+    {
+        return "1";
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
 Tue Dec 19 08:07:12 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
@@ -119,6 +120,15 @@
     }
 
     public void setSaslServer(SaslServer saslServer)
+    {
+    }
+
+    public FieldTable getClientProperties()
+    {
+        return null;
+    }
+
+    public void setClientProperties(FieldTable clientProperties)
     {
     }
 }


Reply via email to