Author: rhs
Date: Tue Apr 22 10:15:34 2008
New Revision: 650581
URL: http://svn.apache.org/viewvc?rev=650581&view=rev
Log:
QPID-832: moved 0-8 specific code into 0-8 subclass of session
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=650581&r1=650580&r2=650581&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Apr 22 10:15:34 2008
@@ -80,7 +80,6 @@
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
@@ -539,20 +538,7 @@
*
* @todo Be aware of possible changes to parameter order as versions
change.
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
- {
-
- BasicAckBody body =
getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
-
- final AMQFrame ackFrame = body.generateFrame(_channelId);
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on
channel " + _channelId);
- }
-
- getProtocolHandler().writeFrame(ackFrame);
- }
+ public abstract void acknowledgeMessage(long deliveryTag, boolean
multiple);
public MethodRegistry getMethodRegistry()
{
@@ -1043,12 +1029,7 @@
{
public Object execute() throws AMQException, FailoverException
{
- QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
-
- AMQFrame queueDeclare = body.generateFrame(_channelId);
-
- getProtocolHandler().syncWrite(queueDeclare,
QueueDeclareOkBody.class);
-
+ sendCreateQueue(name, autoDelete, durable, exclusive);
return null;
}
}, _connection).execute();
@@ -1425,33 +1406,7 @@
_dispatcher.rollback();
}
- if (isStrictAMQP())
- {
- // We can't use the BasicRecoverBody-OK method as it isn't
part of the spec.
-
- BasicRecoverBody body =
getMethodRegistry().createBasicRecoverBody(false);
-
_connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
- _logger.warn("Session Recover cannot be guaranteed with
STRICT_AMQP. Messages may arrive out of order.");
- }
- else
- {
- // in Qpid the 0-8 spec was hacked to have a recover-ok
method... this is bad
- // in 0-9 we used the cleaner addition of a new sync recover
method with its own ok
-
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- BasicRecoverBody body =
getMethodRegistry().createBasicRecoverBody(false);
-
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId),
BasicRecoverOkBody.class);
- }
- else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
- {
- BasicRecoverSyncBody body =
((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
-
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId),
BasicRecoverSyncOkBody.class);
- }
- else
- {
- throw new RuntimeException("Unsupported version of the
AMQP Protocol: " + getProtocolVersion());
- }
- }
+ sendRecover();
if (!isSuspended)
{
@@ -1468,10 +1423,7 @@
}
}
- private ProtocolVersion getProtocolVersion()
- {
- return getProtocolHandler().getProtocolVersion();
- }
+ abstract void sendRecover() throws AMQException, FailoverException;
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -1495,21 +1447,7 @@
}
- public void rejectMessage(long deliveryTag, boolean requeue)
- {
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode ==
SESSION_TRANSACTED))
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting delivery tag:" + deliveryTag +
":SessionHC:" + this.hashCode());
- }
-
- BasicRejectBody body =
getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
- AMQFrame frame = body.generateFrame(_channelId);
-
- _connection.getProtocolHandler().writeFrame(frame);
- }
- }
+ public abstract void rejectMessage(long deliveryTag, boolean requeue);
/**
* Commits all messages done in this transaction and releases any locks
currently held.
@@ -1541,9 +1479,7 @@
releaseForRollback();
- TxRollbackBody body =
getMethodRegistry().createTxRollbackBody();
- AMQFrame frame = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
+ sendRollback();
markClean();
@@ -2127,48 +2063,13 @@
// need to generate a consumer tag on the client so we can exploit the
nowait flag
AMQShortString tag = new AMQShortString(Integer.toString(tagId));
- FieldTable arguments = FieldTableFactory.newFieldTable();
- if ((messageSelector != null) && !messageSelector.equals(""))
- {
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(),
messageSelector);
- }
-
- if (consumer.isAutoClose())
- {
- arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
- }
-
- if (consumer.isNoConsume())
- {
- arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
- }
-
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start
listening
_consumers.put(tagId, consumer);
try
{
- BasicConsumeBody body =
getMethodRegistry().createBasicConsumeBody(getTicket(),
-
queueName,
-
tag,
-
consumer.isNoLocal(),
-
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-
consumer.isExclusive(),
-
nowait,
-
arguments);
-
-
- AMQFrame jmsConsume = body.generateFrame(_channelId);
-
- if (nowait)
- {
- protocolHandler.writeFrame(jmsConsume);
- }
- else
- {
- protocolHandler.syncWrite(jmsConsume,
BasicConsumeOkBody.class);
- }
+ sendConsume(consumer, queueName, protocolHandler, nowait,
messageSelector, tag);
}
catch (AMQException e)
{
@@ -2229,57 +2130,18 @@
public long getQueueDepth(final AMQDestination amqd)
throws AMQException
{
-
- class QueueDeclareOkHandler extends SpecificMethodFrameListener
- {
-
- private long _messageCount;
- private long _consumerCount;
-
- public QueueDeclareOkHandler()
- {
- super(getChannelId(), QueueDeclareOkBody.class);
- }
-
- public boolean processMethod(int channelId, AMQMethodBody frame)
//throws AMQException
- {
- boolean matches = super.processMethod(channelId, frame);
- if (matches)
- {
- QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
- _messageCount = declareOk.getMessageCount();
- _consumerCount = declareOk.getConsumerCount();
- }
- return matches;
- }
-
- }
-
return new FailoverNoopSupport<Long, AMQException>(
new FailoverProtectedOperation<Long, AMQException>()
{
public Long execute() throws AMQException,
FailoverException
{
-
- AMQFrame queueDeclare =
-
getMethodRegistry().createQueueDeclareBody(getTicket(),
-
amqd.getAMQQueueName(),
-
true,
-
amqd.isDurable(),
-
amqd.isExclusive(),
-
amqd.isAutoDelete(),
-
false,
-
null).generateFrame(_channelId);
- QueueDeclareOkHandler okHandler = new
QueueDeclareOkHandler();
-
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
-
- return okHandler._messageCount;
+ return requestQueueDepth(amqd);
}
}, _connection).execute();
}
-
+ abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException,
FailoverException;
/**
* Declares the named exchange and type of exchange.
@@ -2302,11 +2164,7 @@
{
public Object execute() throws AMQException, FailoverException
{
- ExchangeDeclareBody body =
getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null);
- AMQFrame exchangeDeclare = body.generateFrame(_channelId);
-
- protocolHandler.syncWrite(exchangeDeclare,
ExchangeDeclareOkBody.class);
-
+ sendExchangeDeclare(name, type, protocolHandler, nowait);
return null;
}
}, _connection).execute();
@@ -2353,11 +2211,7 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
- QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
-
- AMQFrame queueDeclare = body.generateFrame(_channelId);
-
- protocolHandler.syncWrite(queueDeclare,
QueueDeclareOkBody.class);
+ sendQueueDeclare(amqd, protocolHandler);
return amqd.getAMQQueueName();
}
@@ -2385,15 +2239,7 @@
{
public Object execute() throws AMQException, FailoverException
{
- QueueDeleteBody body =
getMethodRegistry().createQueueDeleteBody(getTicket(),
- queueName,
- false,
- false,
- true);
- AMQFrame queueDeleteFrame =
body.generateFrame(_channelId);
-
- getProtocolHandler().syncWrite(queueDeleteFrame,
QueueDeleteOkBody.class);
-
+ sendQueueDelete(queueName);
return null;
}
}, _connection).execute();
@@ -2682,12 +2528,7 @@
}
_suspended = suspend;
-
- ChannelFlowBody body =
getMethodRegistry().createChannelFlowBody(!suspend);
-
- AMQFrame channelFlowFrame = body.generateFrame(_channelId);
-
- _connection.getProtocolHandler().syncWrite(channelFlowFrame,
ChannelFlowOkBody.class);
+ sendSuspendChannel(suspend);
}
catch (FailoverException e)
{
@@ -2696,11 +2537,13 @@
}
}
+ public abstract void sendSuspendChannel(boolean suspend) throws
AMQException, FailoverException;
+
Object getMessageDeliveryLock()
{
return _messageDeliveryLock;
}
-
+
/**
* Indicates whether this session consumers pre-fetche messages
*
@@ -2711,8 +2554,6 @@
return getAMQConnection().getMaxPrefetch() > 0;
}
-
- public abstract void sendSuspendChannel(boolean suspend) throws
AMQException, FailoverException;
/** Signifies that the session has pending sends to commit. */
public void markDirty()
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=650581&r1=650580&r2=650581&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Apr 22 10:15:34 2008
@@ -746,4 +746,10 @@
return subscriber;
}
+
+ Long requestQueueDepth(AMQDestination amqd)
+ {
+ return
getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
+ }
+
}
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=650581&r1=650580&r2=650581&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Tue Apr 22 10:15:34 2008
@@ -30,6 +30,7 @@
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
@@ -80,10 +81,16 @@
defaultPrefetchLow);
}
+ private ProtocolVersion getProtocolVersion()
+ {
+ return getProtocolHandler().getProtocolVersion();
+ }
+
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- final AMQFrame ackFrame =
-
getProtocolHandler().getMethodRegistry().createBasicAckBody(deliveryTag,
multiple).generateFrame(_channelId);
+ BasicAckBody body =
getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
+
+ final AMQFrame ackFrame = body.generateFrame(_channelId);
if (_logger.isDebugEnabled())
{
@@ -121,7 +128,8 @@
public void sendCreateQueue(AMQShortString name, final boolean autoDelete,
final boolean durable, final boolean exclusive) throws AMQException,
FailoverException
{
- AMQFrame queueDeclare =
getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null).generateFrame(_channelId);
+ QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+ AMQFrame queueDeclare = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
@@ -133,7 +141,7 @@
{
// We can't use the BasicRecoverBody-OK method as it isn't part of
the spec.
- BasicRecoverBody body =
getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
+ BasicRecoverBody body =
getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
_logger.warn("Session Recover cannot be guaranteed with
STRICT_AMQP. Messages may arrive out of order.");
}
@@ -143,17 +151,17 @@
// in 0-9 we used the cleaner addition of a new sync recover
method with its own ok
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- BasicRecoverBody body =
getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
+ BasicRecoverBody body =
getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId),
BasicRecoverOkBody.class);
}
- else
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_9))
+ else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
{
- BasicRecoverSyncBody body =
((MethodRegistry_0_9)getProtocolHandler().getMethodRegistry()).createBasicRecoverSyncBody(false);
+ BasicRecoverSyncBody body =
((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId),
BasicRecoverSyncOkBody.class);
}
else
{
- throw new RuntimeException("Unsupported version of the AMQP
Protocol: " + getProtocolHandler().getProtocolVersion());
+ throw new RuntimeException("Unsupported version of the AMQP
Protocol: " + getProtocolVersion());
}
}
}
@@ -183,12 +191,13 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting delivery tag:" + deliveryTag);
+ _logger.debug("Rejecting delivery tag:" + deliveryTag +
":SessionHC:" + this.hashCode());
}
- AMQFrame basicRejectBody =
getProtocolHandler().getMethodRegistry().createBasicRejectBody(deliveryTag,
requeue).generateFrame(_channelId);
+ BasicRejectBody body =
getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
+ AMQFrame frame = body.generateFrame(_channelId);
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ _connection.getProtocolHandler().writeFrame(frame);
}
}
@@ -229,7 +238,6 @@
public void sendConsume(BasicMessageConsumer consumer, AMQShortString
queueName, AMQProtocolHandler protocolHandler, boolean nowait,
String messageSelector, AMQShortString tag) throws AMQException,
FailoverException
{
-
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
{
@@ -246,18 +254,17 @@
arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
- consumer.setConsumerTag(tag);
- // we must register the consumer in the map before we actually start
listening
- _consumers.put(tag.toIntValue(), consumer);
- // TODO: Be aware of possible changes to parameter order as versions
change.
- AMQFrame jmsConsume =
getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(),
- queueName,
- tag,
- consumer.isNoLocal(),
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(),
- nowait,
- arguments).generateFrame(_channelId);
+ BasicConsumeBody body =
getMethodRegistry().createBasicConsumeBody(getTicket(),
+
queueName,
+ tag,
+
consumer.isNoLocal(),
+
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+
consumer.isExclusive(),
+
nowait,
+
arguments);
+
+
+ AMQFrame jmsConsume = body.generateFrame(_channelId);
if (nowait)
{
@@ -272,26 +279,28 @@
public void sendExchangeDeclare(final AMQShortString name, final
AMQShortString type, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare =
getProtocolHandler().getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
- generateFrame(_channelId);
+ ExchangeDeclareBody body =
getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null);
+ AMQFrame exchangeDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(exchangeDeclare,
ExchangeDeclareOkBody.class);
}
public void sendQueueDeclare(final AMQDestination amqd, final
AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
{
- AMQFrame queueDeclare =
getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
+ QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+
+ AMQFrame queueDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendQueueDelete(final AMQShortString queueName) throws
AMQException, FailoverException
{
- QueueDeleteBody body =
getProtocolHandler().getMethodRegistry().createQueueDeleteBody(getTicket(),
- queueName,
- false,
- false,
- true);
+ QueueDeleteBody body =
getMethodRegistry().createQueueDeleteBody(getTicket(),
+
queueName,
+ false,
+ false,
+ true);
AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeleteFrame,
QueueDeleteOkBody.class);
@@ -299,9 +308,9 @@
public void sendSuspendChannel(boolean suspend) throws AMQException,
FailoverException
{
-
_connection.getProtocolHandler().syncWrite(_connection.getProtocolHandler().getMethodRegistry().
-
createChannelFlowBody(!suspend).generateFrame(_channelId),
- ChannelFlowOkBody.class);
+ ChannelFlowBody body =
getMethodRegistry().createChannelFlowBody(!suspend);
+ AMQFrame channelFlowFrame = body.generateFrame(_channelId);
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame,
ChannelFlowOkBody.class);
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination
destination, final int prefetchHigh,
@@ -326,8 +335,9 @@
public void sendRollback() throws AMQException, FailoverException
{
-
_connection.getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createTxRollbackBody().generateFrame(_channelId),
- TxRollbackOkBody.class);
+ TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
+ AMQFrame frame = body.generateFrame(getChannelId());
+ getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -338,68 +348,109 @@
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
- {
+ {
- checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name,
_connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to
topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- }
- else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable
not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription
already exists for '" + topicName + "' "
- + "for creation durableSubscriber.
Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this
topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
+ checkNotClosed();
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name,
_connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic "
+ topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
- subscriber = new TopicSubscriberAdaptor(dest,
(BasicMessageConsumer) createConsumer(dest));
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not
currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already
exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting
queue deletion regardless.");
+ }
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this
topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)
createConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
+ class QueueDeclareOkHandler extends SpecificMethodFrameListener
+ {
+
+ private long _messageCount;
+ private long _consumerCount;
- return subscriber;
- }
+ public QueueDeclareOkHandler()
+ {
+ super(getChannelId(), QueueDeclareOkBody.class);
+ }
+
+ public boolean processMethod(int channelId, AMQMethodBody frame)
//throws AMQException
+ {
+ boolean matches = super.processMethod(channelId, frame);
+ if (matches)
+ {
+ QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+ _messageCount = declareOk.getMessageCount();
+ _consumerCount = declareOk.getConsumerCount();
+ }
+ return matches;
+ }
+
+ }
+
+ Long requestQueueDepth(AMQDestination amqd) throws AMQException,
FailoverException
+ {
+ AMQFrame queueDeclare =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ true,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+
null).generateFrame(_channelId);
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare,
okHandler);
+ return okHandler._messageCount;
+ }
}