Author: rgreig
Date: Wed Jan 10 00:52:41 2007
New Revision: 494769
URL: http://svn.apache.org/viewvc?view=rev&rev=494769
Log:
QPID-275 : (Patch supplied by Rob Godfrey) Fixes to allow broker to pass more
of the Python tests
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
incubator/qpid/trunk/qpid/python/java_failing.txt
incubator/qpid/trunk/qpid/python/tests/exchange.py
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Wed Jan 10 00:52:41 2007
@@ -71,49 +71,78 @@
if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
- }
- try
- {
- AMQShortString consumerTag =
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
- body.arguments,
body.noLocal);
- if (!body.nowait)
+ if(body.queue!=null)
{
+ AMQShortString msg = new AMQShortString("No such queue, '"
+ body.queue + "'");
// AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as
versions change.
-
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
(byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag)); // consumerTag
+ BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
+ AMQConstant.NOT_FOUND.getCode(), // replyCode
+ msg)); // replyText
+ }
+ else
+ {
+ AMQShortString msg = new AMQShortString("No queue name
provided, no default queue defined.");
+ // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as
versions change.
+
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
-
- //now allow queue to start async processing of any backlog of
messages
- queue.deliverAsync();
- }
- catch (AMQInvalidSelectorException ise)
- {
- _log.info("Closing connection due to invalid selector");
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
- AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
- new AMQShortString(ise.getMessage()))); //
replyText
}
- catch (ConsumerTagNotUniqueException e)
+ else
{
- AMQShortString msg = new AMQShortString("Non-unique consumer
tag, '" + body.consumerTag + "'");
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
-
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ try
+ {
+ AMQShortString consumerTag =
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+
body.arguments, body.noLocal);
+ if (!body.nowait)
+ {
+ // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
+ // TODO: Connect this to the session version obtained
from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as
versions change.
+
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag)); // consumerTag
+ }
+
+ //now allow queue to start async processing of any backlog
of messages
+ queue.deliverAsync();
+ }
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as
versions change.
+
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
+ AMQConstant.INVALID_SELECTOR.getCode(), //
replyCode
+ new AMQShortString(ise.getMessage())));
// replyText
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ AMQShortString msg = new AMQShortString("Non-unique
consumer tag, '" + body.consumerTag + "'");
+ // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as
versions change.
+
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
+ }
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
Wed Jan 10 00:52:41 2007
@@ -22,6 +22,10 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
@@ -66,12 +70,35 @@
{
Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+
+
if (exchange == null)
{
- exchange = exchangeFactory.createExchange(body.exchange,
body.type, body.durable,
- body.passive,
body.ticket);
- exchangeRegistry.registerExchange(exchange);
+ if(body.passive && ((body.type == null) || body.type.length()
==0))
+ {
+ throw new
AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " +
body.exchange,body.getClazz(),
body.getMethod(),body.getMajor(),body.getMinor());
+ }
+ else
+ {
+ try
+ {
+
+ exchange = exchangeFactory.createExchange(body.exchange,
body.type, body.durable,
+ body.passive,
body.ticket);
+ exchangeRegistry.registerExchange(exchange);
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ throw new
AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown
exchange: " + body.exchange,body.getClazz(),
body.getMethod(),body.getMajor(),body.getMinor(),e);
+ }
+ }
+ }
+ else if (!exchange.getType().equals(body.type))
+ {
+
+ throw new
AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare
exchange: " + body.exchange + " of type " + exchange.getType() + " to " +
body.type +".",body.getClazz(),
body.getMethod(),body.getMajor(),body.getMinor());
}
+
}
if(!body.nowait)
{
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
Wed Jan 10 00:52:41 2007
@@ -22,12 +22,11 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQMethodEvent;
@@ -83,20 +82,34 @@
synchronized (queueRegistry)
{
AMQQueue queue;
- if ((queue = queueRegistry.getQueue(body.queue)) == null)
+ if (((queue = queueRegistry.getQueue(body.queue)) == null) )
{
- queue = createQueue(body, queueRegistry, protocolSession);
- if (queue.isDurable() && !queue.isAutoDelete())
+ if(body.passive)
{
- _store.createQueue(queue);
+ String msg = "Queue: " + body.queue + " not found.";
+ throw new
AMQChannelException(AMQConstant.NOT_FOUND.getCode(),
+
msg,
+
body.getClazz(),
+
body.getMethod(),
+
(byte)8,
+
(byte)0 );
+
}
- queueRegistry.registerQueue(queue);
- if (autoRegister)
+ else
{
- Exchange defaultExchange =
exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- defaultExchange.registerQueue(body.queue, queue, null);
- queue.bind(body.queue, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default
exchange");
+ queue = createQueue(body, queueRegistry, protocolSession);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ _store.createQueue(queue);
+ }
+ queueRegistry.registerQueue(queue);
+ if (autoRegister)
+ {
+ Exchange defaultExchange =
exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ defaultExchange.registerQueue(body.queue, queue, null);
+ queue.bind(body.queue, defaultExchange);
+ _log.info("Queue " + body.queue + " bound to default
exchange");
+ }
}
}
//set this as the default queue on the channel:
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
Wed Jan 10 00:52:41 2007
@@ -31,7 +31,10 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class QueueDeleteHandler implements
StateAwareMethodListener<QueueDeleteBody>
{
@@ -79,14 +82,30 @@
}
else
{
- int purged = queue.delete(body.ifUnused, body.ifEmpty);
- _store.removeQueue(queue.getName().toString());
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
-
session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- purged)); // messageCount
+ if(body.ifEmpty && !queue.isEmpty())
+ {
+ AMQShortString msg = new AMQShortString("Queue: " + body.queue
+ " is not empty.");
+ // TODO - Error code
+
session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8,
(byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+ }
+ else if(body.ifUnused && !queue.isUnused())
+ {
+ AMQShortString msg = new AMQShortString("Queue: " + body.queue
+ " is still used.");
+ // TODO - Error code
+
session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8,
(byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+
+ }
+ else
+ {
+ int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ _store.removeQueue(queue.getName().toString());
+ // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions
change.
+
session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ purged)); // messageCount
+ }
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Wed Jan 10 00:52:41 2007
@@ -26,6 +26,7 @@
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
@@ -107,7 +108,7 @@
_stateManager = stateManager;
_minaProtocolSession = session;
session.setAttachment(this);
- _frameListeners.add(_stateManager);
+
_queueRegistry = queueRegistry;
_exchangeRegistry = exchangeRegistry;
_codecFactory = codecFactory;
@@ -206,11 +207,15 @@
(AMQMethodBody) frame.bodyFrame);
try
{
- boolean wasAnyoneInterested = false;
- for (AMQMethodListener listener : _frameListeners)
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt,
this, _queueRegistry, _exchangeRegistry);
+
+ if(!_frameListeners.isEmpty())
{
- wasAnyoneInterested = listener.methodReceived(evt, this,
_queueRegistry, _exchangeRegistry) ||
- wasAnyoneInterested;
+ for (AMQMethodListener listener : _frameListeners)
+ {
+ wasAnyoneInterested = listener.methodReceived(evt, this,
_queueRegistry, _exchangeRegistry) ||
+ wasAnyoneInterested;
+ }
}
if (!wasAnyoneInterested)
{
@@ -222,8 +227,14 @@
_logger.error("Closing channel due to: " + e.getMessage());
writeFrame(e.getCloseFrame(frame.channel));
}
+ catch (AMQConnectionException e)
+ {
+ _logger.error("Closing connection due to: " + e.getMessage());
+ writeFrame(e.getCloseFrame(frame.channel));
+ }
catch (AMQException e)
{
+ _stateManager.error(e);
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Wed Jan 10 00:52:41 2007
@@ -410,6 +410,17 @@
}
}
+ public boolean isUnused()
+ {
+ return _subscribers.isEmpty();
+ }
+
+ public boolean isEmpty()
+ {
+ return !_deliveryMgr.hasQueuedMessages();
+ }
+
+
public int delete(boolean checkUnused, boolean checkEmpty) throws
AMQException
{
if (checkUnused && !_subscribers.isEmpty())
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
Wed Jan 10 00:52:41 2007
@@ -63,7 +63,7 @@
public int hashCode()
{
- return exchange.hashCode() + routingKey.hashCode();
+ return (exchange == null ? 0 : exchange.hashCode()) + (routingKey
== null ? 0 : routingKey.hashCode());
}
public boolean equals(Object o)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Wed Jan 10 00:52:41 2007
@@ -301,8 +301,10 @@
if (_noLocal)
{
// We don't want local messages so check to see if message is one
we sent
- if
(protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()).equals(
-
msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString())))
+ Object localInstance =
protocolSession.getClientProperties().getObject(ClientProperties.instance.toString());
+ Object msgInstance =
msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString());
+
+ if (localInstance == msgInstance || ((localInstance != null) &&
localInstance.equals(msgInstance)))
{
if (_logger.isTraceEnabled())
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Wed Jan 10 00:52:41 2007
@@ -55,7 +55,7 @@
Queue queue = new AMQQueue(new AMQShortString("someQ"), new
AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession)
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ ((AMQSession)
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
@@ -112,7 +112,7 @@
Queue queue = new AMQQueue(new AMQShortString("someQ"), new
AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession)
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ ((AMQSession)
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
Wed Jan 10 00:52:41 2007
@@ -94,7 +94,7 @@
public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506,
"resource error", true);
- public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not
allowed", true);
+ public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not
allowed", true);
public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540,
"not implemented", true);
Modified: incubator/qpid/trunk/qpid/python/java_failing.txt
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/java_failing.txt?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- incubator/qpid/trunk/qpid/python/java_failing.txt (original)
+++ incubator/qpid/trunk/qpid/python/java_failing.txt Wed Jan 10 00:52:41 2007
@@ -1,29 +1,15 @@
-tests.basic.BasicTests.test_cancel
tests.basic.BasicTests.test_consume_exclusive
tests.basic.BasicTests.test_consume_no_local
-tests.basic.BasicTests.test_consume_queue_errors
-tests.basic.BasicTests.test_consume_unique_consumers
tests.basic.BasicTests.test_get
tests.basic.BasicTests.test_qos_prefetch_size
tests.basic.BasicTests.test_recover_requeue
-tests.exchange.ExchangeTests
tests.exchange.DefaultExchangeRuleTests.testDefaultExchange
tests.exchange.HeadersExchangeTests.testMatchAll
tests.exchange.HeadersExchangeTests.testMatchAny
-tests.exchange.RecommendedTypesRuleTests.testDirect
-tests.exchange.RecommendedTypesRuleTests.testFanout
-tests.exchange.RecommendedTypesRuleTests.testHeaders
tests.exchange.RecommendedTypesRuleTests.testTopic
-tests.exchange.RequiredInstancesRuleTests.testAmqDirect
-tests.exchange.RequiredInstancesRuleTests.testAmqFanOut
tests.exchange.RequiredInstancesRuleTests.testAmqMatch
tests.exchange.RequiredInstancesRuleTests.testAmqTopic
tests.queue.QueueTests.test_declare_exclusive
-tests.queue.QueueTests.test_declare_passive
-tests.queue.QueueTests.test_delete_ifempty
-tests.queue.QueueTests.test_delete_ifunused
-tests.queue.QueueTests.test_delete_simple
tests.queue.QueueTests.test_purge
-tests.queue.QueueTests.test_bind
tests.testlib.TestBaseTest.testMessageProperties
tests.broker.BrokerTests.test_invalid_channel
Modified: incubator/qpid/trunk/qpid/python/tests/exchange.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/exchange.py?view=diff&rev=494769&r1=494768&r2=494769
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/exchange.py Wed Jan 10 00:52:41 2007
@@ -316,9 +316,9 @@
self.channel.exchange_declare(exchange="test_different_declared_type_exchange",
type="direct")
try:
self.channel.exchange_declare(exchange="test_different_declared_type_exchange",
type="topic")
- self.fail("Expected 507 for redeclaration of exchange with
different type.")
+ self.fail("Expected 530 for redeclaration of exchange with
different type.")
except Closed, e:
- self.assertConnectionException(507, e.args[0])
+ self.assertConnectionException(530, e.args[0])
#cleanup
other = self.connect()
c2 = other.channel(1)