Author: rgreig
Date: Wed Jan 10 00:52:41 2007
New Revision: 494769

URL: http://svn.apache.org/viewvc?view=rev&rev=494769
Log:
QPID-275 : (Patch supplied by Rob Godfrey) Fixes to allow broker to pass more 
of the Python tests

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
    incubator/qpid/trunk/qpid/python/java_failing.txt
    incubator/qpid/trunk/qpid/python/tests/exchange.py

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 Wed Jan 10 00:52:41 2007
@@ -71,49 +71,78 @@
             if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
-            }
-            try
-            {
-                AMQShortString consumerTag = 
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
-                                                              body.arguments, 
body.noLocal);
-                if (!body.nowait)
+                if(body.queue!=null)
                 {
+                    AMQShortString msg = new AMQShortString("No such queue, '" 
+ body.queue + "'");
                     // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
                     // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
                     // Be aware of possible changes to parameter order as 
versions change.
-                    
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+                    
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
                         (byte)8, (byte)0,      // AMQP version (major, minor)
-                        consumerTag));         // consumerTag
+                        BasicConsumeBody.getClazz((byte)8, (byte)0),   // 
classId
+                        BasicConsumeBody.getMethod((byte)8, (byte)0),  // 
methodId
+                        AMQConstant.NOT_FOUND.getCode(),       // replyCode
+                        msg)); // replyText
+                }
+                else
+                {
+                    AMQShortString msg = new AMQShortString("No queue name 
provided, no default queue defined.");
+                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as 
versions change.
+                    
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+                        (byte)8, (byte)0,      // AMQP version (major, minor)
+                        BasicConsumeBody.getClazz((byte)8, (byte)0),   // 
classId
+                        BasicConsumeBody.getMethod((byte)8, (byte)0),  // 
methodId
+                        AMQConstant.NOT_ALLOWED.getCode(),     // replyCode
+                        msg)); // replyText
                 }
-
-                //now allow queue to start async processing of any backlog of 
messages
-                queue.deliverAsync();
-            }
-            catch (AMQInvalidSelectorException ise)
-            {
-                _log.info("Closing connection due to invalid selector");
-                // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-                // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions 
change.
-                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
-                    (byte)8, (byte)0,  // AMQP version (major, minor)
-                    BasicConsumeBody.getClazz((byte)8, (byte)0),       // 
classId
-                    BasicConsumeBody.getMethod((byte)8, (byte)0),      // 
methodId
-                    AMQConstant.INVALID_SELECTOR.getCode(),    // replyCode
-                    new AMQShortString(ise.getMessage())));            // 
replyText
             }
-            catch (ConsumerTagNotUniqueException e)
+            else
             {
-                AMQShortString msg = new AMQShortString("Non-unique consumer 
tag, '" + body.consumerTag + "'");
-                // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-                // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions 
change.
-                
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
-                    (byte)8, (byte)0,  // AMQP version (major, minor)
-                    BasicConsumeBody.getClazz((byte)8, (byte)0),       // 
classId
-                    BasicConsumeBody.getMethod((byte)8, (byte)0),      // 
methodId
-                    AMQConstant.NOT_ALLOWED.getCode(), // replyCode
-                    msg));     // replyText
+                try
+                {
+                    AMQShortString consumerTag = 
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+                                                                  
body.arguments, body.noLocal);
+                    if (!body.nowait)
+                    {
+                        // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
+                        // TODO: Connect this to the session version obtained 
from ProtocolInitiation for this session.
+                        // Be aware of possible changes to parameter order as 
versions change.
+                        
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+                            (byte)8, (byte)0,  // AMQP version (major, minor)
+                            consumerTag));             // consumerTag
+                    }
+
+                    //now allow queue to start async processing of any backlog 
of messages
+                    queue.deliverAsync();
+                }
+                catch (AMQInvalidSelectorException ise)
+                {
+                    _log.info("Closing connection due to invalid selector");
+                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as 
versions change.
+                    
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+                        (byte)8, (byte)0,      // AMQP version (major, minor)
+                        BasicConsumeBody.getClazz((byte)8, (byte)0),   // 
classId
+                        BasicConsumeBody.getMethod((byte)8, (byte)0),  // 
methodId
+                        AMQConstant.INVALID_SELECTOR.getCode(),        // 
replyCode
+                        new AMQShortString(ise.getMessage())));                
// replyText
+                }
+                catch (ConsumerTagNotUniqueException e)
+                {
+                    AMQShortString msg = new AMQShortString("Non-unique 
consumer tag, '" + body.consumerTag + "'");
+                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as 
versions change.
+                    
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+                        (byte)8, (byte)0,      // AMQP version (major, minor)
+                        BasicConsumeBody.getClazz((byte)8, (byte)0),   // 
classId
+                        BasicConsumeBody.getMethod((byte)8, (byte)0),  // 
methodId
+                        AMQConstant.NOT_ALLOWED.getCode(),     // replyCode
+                        msg)); // replyText
+                }
             }
         }
     }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 Wed Jan 10 00:52:41 2007
@@ -22,6 +22,10 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.ExchangeDeclareOkBody;
@@ -66,12 +70,35 @@
         {
             Exchange exchange = exchangeRegistry.getExchange(body.exchange);
 
+
+
             if (exchange == null)
             {
-                exchange = exchangeFactory.createExchange(body.exchange, 
body.type, body.durable,
-                                                          body.passive, 
body.ticket);
-                exchangeRegistry.registerExchange(exchange);
+                if(body.passive && ((body.type == null) || body.type.length() 
==0))
+                {
+                    throw new 
AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + 
body.exchange,body.getClazz(), 
body.getMethod(),body.getMajor(),body.getMinor());                    
+                }
+                else
+                {
+                    try
+                    {
+
+                    exchange = exchangeFactory.createExchange(body.exchange, 
body.type, body.durable,
+                                                              body.passive, 
body.ticket);
+                    exchangeRegistry.registerExchange(exchange);
+                    }
+                    catch(AMQUnknownExchangeType e)
+                    {
+                        throw new 
AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown 
exchange: " + body.exchange,body.getClazz(), 
body.getMethod(),body.getMajor(),body.getMinor(),e);
+                    }
+                }
+            }
+            else if (!exchange.getType().equals(body.type))
+            {
+
+                throw new 
AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare 
exchange: " + body.exchange + " of type " + exchange.getType() + " to " + 
body.type +".",body.getClazz(), 
body.getMethod(),body.getMajor(),body.getMinor());    
             }
+
         }
         if(!body.nowait)
         {

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
 Wed Jan 10 00:52:41 2007
@@ -22,12 +22,11 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQMethodEvent;
@@ -83,20 +82,34 @@
         synchronized (queueRegistry)
         {
             AMQQueue queue;
-            if ((queue = queueRegistry.getQueue(body.queue)) == null)
+            if (((queue = queueRegistry.getQueue(body.queue)) == null) )
             {
-                queue = createQueue(body, queueRegistry, protocolSession);
-                if (queue.isDurable() && !queue.isAutoDelete())
+                if(body.passive)
                 {
-                    _store.createQueue(queue);
+                    String msg = "Queue: " + body.queue + " not found.";
+                    throw new 
AMQChannelException(AMQConstant.NOT_FOUND.getCode(),
+                                                                               
   msg,
+                                                                               
   body.getClazz(),
+                                                                               
   body.getMethod(),
+                                                                               
   (byte)8,
+                                                                               
   (byte)0      );
+
                 }
-                queueRegistry.registerQueue(queue);
-                if (autoRegister)
+                else
                 {
-                    Exchange defaultExchange = 
exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
-                    defaultExchange.registerQueue(body.queue, queue, null);
-                    queue.bind(body.queue, defaultExchange);
-                    _log.info("Queue " + body.queue + " bound to default 
exchange");
+                    queue = createQueue(body, queueRegistry, protocolSession);
+                    if (queue.isDurable() && !queue.isAutoDelete())
+                    {
+                        _store.createQueue(queue);
+                    }
+                    queueRegistry.registerQueue(queue);
+                    if (autoRegister)
+                    {
+                        Exchange defaultExchange = 
exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+                        defaultExchange.registerQueue(body.queue, queue, null);
+                        queue.bind(body.queue, defaultExchange);
+                        _log.info("Queue " + body.queue + " bound to default 
exchange");
+                    }
                 }
             }
             //set this as the default queue on the channel:

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
 Wed Jan 10 00:52:41 2007
@@ -31,7 +31,10 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 
 public class QueueDeleteHandler  implements 
StateAwareMethodListener<QueueDeleteBody>
 {
@@ -79,14 +82,30 @@
         }
         else
         {
-            int purged = queue.delete(body.ifUnused, body.ifEmpty);
-            _store.removeQueue(queue.getName().toString());
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            
session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
-                (byte)8, (byte)0,      // AMQP version (major, minor)
-                purged));      // messageCount
+            if(body.ifEmpty && !queue.isEmpty())
+            {
+                AMQShortString msg = new AMQShortString("Queue: " + body.queue 
+ " is not empty.");
+                // TODO - Error code
+                
session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, 
(byte)0, body.getClazz(), body.getMethod(), 406, msg    ));
+            }
+            else if(body.ifUnused && !queue.isUnused())
+            {
+                AMQShortString msg = new AMQShortString("Queue: " + body.queue 
+ " is still used.");
+                // TODO - Error code
+                
session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, 
(byte)0, body.getClazz(), body.getMethod(), 406, msg    ));
+
+            }
+            else
+            {
+                int purged = queue.delete(body.ifUnused, body.ifEmpty);
+                _store.removeQueue(queue.getName().toString());
+                // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
+                // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions 
change.
+                
session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+                    (byte)8, (byte)0,  // AMQP version (major, minor)
+                    purged));  // messageCount
+            }
         }
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Wed Jan 10 00:52:41 2007
@@ -26,6 +26,7 @@
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
@@ -107,7 +108,7 @@
         _stateManager = stateManager;
         _minaProtocolSession = session;
         session.setAttachment(this);
-        _frameListeners.add(_stateManager);
+        
         _queueRegistry = queueRegistry;
         _exchangeRegistry = exchangeRegistry;
         _codecFactory = codecFactory;
@@ -206,11 +207,15 @@
                                                                                
     (AMQMethodBody) frame.bodyFrame);
         try
         {
-            boolean wasAnyoneInterested = false;
-            for (AMQMethodListener listener : _frameListeners)
+            boolean wasAnyoneInterested = _stateManager.methodReceived(evt, 
this, _queueRegistry, _exchangeRegistry);
+
+            if(!_frameListeners.isEmpty())
             {
-                wasAnyoneInterested = listener.methodReceived(evt, this, 
_queueRegistry, _exchangeRegistry) ||
-                                      wasAnyoneInterested;
+                for (AMQMethodListener listener : _frameListeners)
+                {
+                    wasAnyoneInterested = listener.methodReceived(evt, this, 
_queueRegistry, _exchangeRegistry) ||
+                                          wasAnyoneInterested;
+                }
             }
             if (!wasAnyoneInterested)
             {
@@ -222,8 +227,14 @@
             _logger.error("Closing channel due to: " + e.getMessage());
             writeFrame(e.getCloseFrame(frame.channel));
         }
+        catch (AMQConnectionException e)
+        {
+            _logger.error("Closing connection due to: " + e.getMessage());
+            writeFrame(e.getCloseFrame(frame.channel));
+        }        
         catch (AMQException e)
         {
+            _stateManager.error(e);
             for (AMQMethodListener listener : _frameListeners)
             {
                 listener.error(e);

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Wed Jan 10 00:52:41 2007
@@ -410,6 +410,17 @@
         }
     }
 
+    public boolean isUnused()
+    {
+        return _subscribers.isEmpty();
+    }
+
+    public boolean isEmpty()
+    {
+        return !_deliveryMgr.hasQueuedMessages();
+    }
+
+
     public int delete(boolean checkUnused, boolean checkEmpty) throws 
AMQException
     {
         if (checkUnused && !_subscribers.isEmpty())

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
 Wed Jan 10 00:52:41 2007
@@ -63,7 +63,7 @@
 
         public int hashCode()
         {
-            return exchange.hashCode() + routingKey.hashCode();
+            return (exchange == null ? 0 : exchange.hashCode()) + (routingKey 
== null ? 0 : routingKey.hashCode());
         }
 
         public boolean equals(Object o)

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Wed Jan 10 00:52:41 2007
@@ -301,8 +301,10 @@
         if (_noLocal)
         {
             // We don't want local messages so check to see if message is one 
we sent
-            if 
(protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()).equals(
-                    
msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString())))
+            Object localInstance = 
protocolSession.getClientProperties().getObject(ClientProperties.instance.toString());
+            Object msgInstance = 
msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString());
+
+            if (localInstance == msgInstance || ((localInstance != null) && 
localInstance.equals(msgInstance)))
             {
                 if (_logger.isTraceEnabled())
                 {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
 Wed Jan 10 00:52:41 2007
@@ -55,7 +55,7 @@
         Queue queue = new AMQQueue(new AMQShortString("someQ"), new 
AMQShortString("someQ"), false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
         //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) 
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+        ((AMQSession) 
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
         Session producerSession = con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
@@ -112,7 +112,7 @@
         Queue queue = new AMQQueue(new AMQShortString("someQ"), new 
AMQShortString("someQ"), false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
         //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) 
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+        ((AMQSession) 
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
         Session producerSession = con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
 Wed Jan 10 00:52:41 2007
@@ -94,7 +94,7 @@
 
     public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, 
"resource error", true);
 
-    public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not 
allowed", true);
+    public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not 
allowed", true);
 
     public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, 
"not implemented", true);
 

Modified: incubator/qpid/trunk/qpid/python/java_failing.txt
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/java_failing.txt?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- incubator/qpid/trunk/qpid/python/java_failing.txt (original)
+++ incubator/qpid/trunk/qpid/python/java_failing.txt Wed Jan 10 00:52:41 2007
@@ -1,29 +1,15 @@
-tests.basic.BasicTests.test_cancel
 tests.basic.BasicTests.test_consume_exclusive
 tests.basic.BasicTests.test_consume_no_local
-tests.basic.BasicTests.test_consume_queue_errors
-tests.basic.BasicTests.test_consume_unique_consumers
 tests.basic.BasicTests.test_get
 tests.basic.BasicTests.test_qos_prefetch_size
 tests.basic.BasicTests.test_recover_requeue
-tests.exchange.ExchangeTests
 tests.exchange.DefaultExchangeRuleTests.testDefaultExchange
 tests.exchange.HeadersExchangeTests.testMatchAll
 tests.exchange.HeadersExchangeTests.testMatchAny
-tests.exchange.RecommendedTypesRuleTests.testDirect
-tests.exchange.RecommendedTypesRuleTests.testFanout
-tests.exchange.RecommendedTypesRuleTests.testHeaders
 tests.exchange.RecommendedTypesRuleTests.testTopic
-tests.exchange.RequiredInstancesRuleTests.testAmqDirect
-tests.exchange.RequiredInstancesRuleTests.testAmqFanOut
 tests.exchange.RequiredInstancesRuleTests.testAmqMatch
 tests.exchange.RequiredInstancesRuleTests.testAmqTopic
 tests.queue.QueueTests.test_declare_exclusive
-tests.queue.QueueTests.test_declare_passive
-tests.queue.QueueTests.test_delete_ifempty
-tests.queue.QueueTests.test_delete_ifunused
-tests.queue.QueueTests.test_delete_simple
 tests.queue.QueueTests.test_purge
-tests.queue.QueueTests.test_bind
 tests.testlib.TestBaseTest.testMessageProperties
 tests.broker.BrokerTests.test_invalid_channel

Modified: incubator/qpid/trunk/qpid/python/tests/exchange.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/exchange.py?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/exchange.py Wed Jan 10 00:52:41 2007
@@ -316,9 +316,9 @@
         
self.channel.exchange_declare(exchange="test_different_declared_type_exchange", 
type="direct")
         try:
             
self.channel.exchange_declare(exchange="test_different_declared_type_exchange", 
type="topic")
-            self.fail("Expected 507 for redeclaration of exchange with 
different type.")
+            self.fail("Expected 530 for redeclaration of exchange with 
different type.")
         except Closed, e:
-            self.assertConnectionException(507, e.args[0])
+            self.assertConnectionException(530, e.args[0])
         #cleanup    
         other = self.connect()
         c2 = other.channel(1)


Reply via email to