Author: ritchiem
Date: Tue May 1 00:17:43 2007
New Revision: 533957
URL: http://svn.apache.org/viewvc?view=rev&rev=533957
Log:
Merged revisions
532766-532785,532788-532790,532792-533064,533066-533074,533076,533080-533130,533132-533139,533142-533703,533705-533765
via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r532766 | rgreig | 2007-04-26 15:57:04 +0100 (Thu, 26 Apr 2007) | 1 line
Rationlized the performance tests.
........
r532794 | rgreig | 2007-04-26 17:33:10 +0100 (Thu, 26 Apr 2007) | 1 line
Rationalized the performance tests.
........
r533721 | rgodfrey | 2007-04-30 13:24:41 +0100 (Mon, 30 Apr 2007) | 1 line
QPID-476 : Remove duplicate map of channelId to session
........
r533764 | ritchiem | 2007-04-30 15:37:23 +0100 (Mon, 30 Apr 2007) | 4 lines
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java
client.
Updated to allow the use of durable subscriptions but it will not be as clean
as with the extensions.
Selectors are also now disabled.
........
r533765 | ritchiem | 2007-04-30 15:39:18 +0100 (Mon, 30 Apr 2007) | 1 line
QPID-461 Update to CommitRollbackTest. Ensuring messages received have the
correct redelivered value, regardless of order.
........
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Tue May 1 00:17:43 2007
@@ -96,7 +96,8 @@
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map _sessions = new LinkedHashMap(); // fixme this is map is
replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map<Integer,AMQSession> _sessions = new
LinkedHashMap<Integer,AMQSession>();
+
private String _clientName;
@@ -508,7 +509,7 @@
AMQSession session =
new AMQSession(AMQConnection.this, channelId,
transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
- _protocolHandler.addSessionByChannel(channelId,
session);
+ //_protocolHandler.addSessionByChannel(channelId,
session);
registerSession(channelId, session);
boolean success = false;
@@ -527,7 +528,6 @@
{
if (!success)
{
-
_protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
}
}
@@ -589,7 +589,6 @@
}
catch (AMQException e)
{
- _protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
throw new AMQException("Error reopening channel " + channelId + "
after failover: " + e, e);
}
@@ -1136,7 +1135,7 @@
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
- _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(),
s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
@@ -1222,5 +1221,11 @@
public void performConnectionTask(Runnable task)
{
_taskPool.execute(task);
+ }
+
+
+ public AMQSession getSession(int channelId)
+ {
+ return _sessions.get(channelId);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue May 1 00:17:43 2007
@@ -209,6 +209,12 @@
private final boolean _strictAMQP;
+ /** System property to enable strickt AMQP compliance */
+ public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+ /** Strickt AMQP default */
+ public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+ private final boolean _strictAMQPFATAL;
/** System property to enable immediate message prefetching */
public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
@@ -429,23 +435,14 @@
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry)
- {
- this(con, channelId, transacted, acknowledgeMode,
messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int
defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode,
messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int
defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
_strictAMQP =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP,
STRICT_AMQP_DEFAULT));
+ _strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL,
STRICT_AMQP_FATAL_DEFAULT));
_immediatePrefetch =
Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH,
IMMEDIATE_PREFETCH_DEFAULT));
_connection = con;
@@ -493,15 +490,7 @@
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode)
- {
- this(con, channelId, transacted, acknowledgeMode,
MessageFactoryRegistry.newDefaultRegistry());
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode,
MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
@@ -796,7 +785,7 @@
amqe = new AMQException("Closing session forcibly", e);
}
_connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ closeProducersAndConsumers(amqe);
}
}
@@ -809,6 +798,7 @@
_closed.set(true);
_connection.deregisterSession(_channelId);
markClosedProducersAndConsumers();
+
}
private void markClosedProducersAndConsumers()
@@ -941,7 +931,7 @@
getProtocolMajorVersion(),
getProtocolMinorVersion(),
false)); // requeue
- _logger.warn("Session Recover cannot be guaranteed with
STRICT_AMQP. Messages may arrive out of order.");
+ _logger.warn("Session Recover cannot be guaranteed with
STRICT_AMQP. Messages may arrive out of order.");
}
else
{
@@ -1229,13 +1219,30 @@
final int prefetchLow,
final boolean noLocal,
final boolean exclusive,
- final String selector,
+ String selector,
final FieldTable rawSelector,
final boolean noConsume,
final boolean autoClose)
throws JMSException
{
checkTemporaryDestination(destination);
+ final String messageSelector;
+
+ if (_strictAMQP && !(selector == null || selector.equals("")))
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("Selectors not
currently supported by AMQP.");
+ }
+ else
+ {
+ messageSelector = null;
+ }
+ }
+ else
+ {
+ messageSelector = selector;
+ }
return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
{
@@ -1246,6 +1253,7 @@
AMQDestination amqd = (AMQDestination) destination;
final AMQProtocolHandler protocolHandler =
getProtocolHandler();
+ // TODO: Define selectors in AMQP
// TODO: construct the rawSelector from the selector string if
rawSelector == null
final FieldTable ft = FieldTableFactory.newFieldTable();
//if (rawSelector != null)
@@ -1254,7 +1262,8 @@
{
ft.addAll(rawSelector);
}
- BasicMessageConsumer consumer = new
BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
+
+ BasicMessageConsumer consumer = new
BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
_acknowledgeMode, noConsume, autoClose);
@@ -1647,6 +1656,8 @@
public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
{
+
+
checkNotClosed();
AMQTopic origTopic = checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name,
_connection);
@@ -1674,13 +1685,31 @@
{
topicName = new AMQShortString(topic.getTopicName());
}
- // if the queue is bound to the exchange but NOT for this topic,
then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
- !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(),
topicName))
+
+ if (_strictAMQP)
{
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not
currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already
exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting
queue deletion regardless.");
+ }
+
deleteQueue(dest.getAMQQueueName());
}
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this
topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
}
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)
createConsumer(dest));
@@ -1778,13 +1807,31 @@
}
else
{
- if (isQueueBound(getDefaultTopicExchangeName(),
AMQTopic.getDurableTopicQueueName(name, _connection)))
+ if (_strictAMQP)
{
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not
currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already
exists for '" + name + "' for unsubscribe."
+ + " Requesting queue deletion regardless.");
+ }
+
deleteQueue(AMQTopic.getDurableTopicQueueName(name,
_connection));
}
else
{
- throw new InvalidDestinationException("Unknown subscription
exchange:" + name);
+
+ if (isQueueBound(getDefaultTopicExchangeName(),
AMQTopic.getDurableTopicQueueName(name, _connection)))
+ {
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name,
_connection));
+ }
+ else
+ {
+ throw new InvalidDestinationException("Unknown
subscription exchange:" + name);
+ }
}
}
}
@@ -1796,10 +1843,6 @@
boolean isQueueBound(AMQShortString exchangeName, AMQShortString
queueName, AMQShortString routingKey) throws JMSException
{
- if (isStrictAMQP())
- {
- throw new UnsupportedOperationException();
- }
// TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
Tue May 1 00:17:43 2007
@@ -9,119 +9,141 @@
import javax.jms.QueueSender;
import javax.jms.InvalidDestinationException;
-public class QueueSenderAdapter implements QueueSender {
+public class QueueSenderAdapter implements QueueSender
+{
- private BasicMessageProducer _delegate;
- private Queue _queue;
- private boolean closed = false;
-
- public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue
queue){
- _delegate = msgProducer;
- _queue = queue;
- }
-
- public Queue getQueue() throws JMSException {
- checkPreConditions();
- return _queue;
- }
-
- public void send(Message msg) throws JMSException {
- checkPreConditions();
- _delegate.send(msg);
- }
-
- public void send(Queue queue, Message msg) throws JMSException {
- checkPreConditions(queue);
- _delegate.send(queue, msg);
- }
-
- public void publish(Message msg, int deliveryMode, int priority, long
timeToLive)
- throws JMSException {
- checkPreConditions();
- _delegate.send(msg, deliveryMode,priority,timeToLive);
- }
-
- public void send(Queue queue,Message msg, int deliveryMode, int
priority, long timeToLive)
- throws JMSException {
- checkPreConditions(queue);
- _delegate.send(queue,msg, deliveryMode,priority,timeToLive);
- }
-
- public void close() throws JMSException {
- _delegate.close();
- closed = true;
- }
-
- public int getDeliveryMode() throws JMSException {
- checkPreConditions();
- return _delegate.getDeliveryMode();
- }
-
- public Destination getDestination() throws JMSException {
- checkPreConditions();
- return _delegate.getDestination();
- }
-
- public boolean getDisableMessageID() throws JMSException {
- checkPreConditions();
- return _delegate.getDisableMessageID();
- }
-
- public boolean getDisableMessageTimestamp() throws JMSException {
- checkPreConditions();
- return _delegate.getDisableMessageTimestamp();
- }
-
- public int getPriority() throws JMSException {
- checkPreConditions();
- return _delegate.getPriority();
- }
-
- public long getTimeToLive() throws JMSException {
- checkPreConditions();
- return _delegate.getTimeToLive();
- }
-
- public void send(Destination dest, Message msg) throws JMSException {
- checkPreConditions((Queue)dest);
- _delegate.send(dest,msg);
- }
-
- public void send(Message msg, int deliveryMode, int priority, long
timeToLive)
- throws JMSException {
- checkPreConditions();
- _delegate.send(msg, deliveryMode,priority,timeToLive);
- }
-
- public void send(Destination dest, Message msg, int deliveryMode, int
priority, long timeToLive) throws JMSException {
- checkPreConditions((Queue)dest);
- _delegate.send(dest,msg, deliveryMode,priority,timeToLive);
- }
-
- public void setDeliveryMode(int deliveryMode) throws JMSException {
- checkPreConditions();
- _delegate.setDeliveryMode(deliveryMode);
- }
-
- public void setDisableMessageID(boolean disableMessageID) throws
JMSException {
- checkPreConditions();
- _delegate.setDisableMessageID(disableMessageID);
- }
-
- public void setDisableMessageTimestamp(boolean disableMessageTimestamp)
throws JMSException {
- checkPreConditions();
- _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
- }
-
- public void setPriority(int priority) throws JMSException {
- checkPreConditions();
- _delegate.setPriority(priority);
- }
-
- public void setTimeToLive(long timeToLive) throws JMSException {
- checkPreConditions();
- _delegate.setTimeToLive(timeToLive);
- }
+ private BasicMessageProducer _delegate;
+ private Queue _queue;
+ private boolean closed = false;
+
+ public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue)
+ {
+ _delegate = msgProducer;
+ _queue = queue;
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ checkPreConditions();
+ return _queue;
+ }
+
+ public void send(Message msg) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.send(msg);
+ }
+
+ public void send(Queue queue, Message msg) throws JMSException
+ {
+ checkPreConditions(queue);
+ _delegate.send(queue, msg);
+ }
+
+ public void publish(Message msg, int deliveryMode, int priority, long
timeToLive)
+ throws JMSException
+ {
+ checkPreConditions();
+ _delegate.send(msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void send(Queue queue, Message msg, int deliveryMode, int priority,
long timeToLive)
+ throws JMSException
+ {
+ checkPreConditions(queue);
+ _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void close() throws JMSException
+ {
+ _delegate.close();
+ closed = true;
+ }
+
+ public int getDeliveryMode() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDeliveryMode();
+ }
+
+ public Destination getDestination() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDestination();
+ }
+
+ public boolean getDisableMessageID() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDisableMessageID();
+ }
+
+ public boolean getDisableMessageTimestamp() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDisableMessageTimestamp();
+ }
+
+ public int getPriority() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getPriority();
+ }
+
+ public long getTimeToLive() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getTimeToLive();
+ }
+
+ public void send(Destination dest, Message msg) throws JMSException
+ {
+ checkPreConditions((Queue) dest);
+ _delegate.send(dest, msg);
+ }
+
+ public void send(Message msg, int deliveryMode, int priority, long
timeToLive)
+ throws JMSException
+ {
+ checkPreConditions();
+ _delegate.send(msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void send(Destination dest, Message msg, int deliveryMode, int
priority, long timeToLive) throws JMSException
+ {
+ checkPreConditions((Queue) dest);
+ _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void setDeliveryMode(int deliveryMode) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setDeliveryMode(deliveryMode);
+ }
+
+ public void setDisableMessageID(boolean disableMessageID) throws
JMSException
+ {
+ checkPreConditions();
+ _delegate.setDisableMessageID(disableMessageID);
+ }
+
+ public void setDisableMessageTimestamp(boolean disableMessageTimestamp)
throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+ }
+
+ public void setPriority(int priority) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setPriority(priority);
+ }
+
+ public void setTimeToLive(long timeToLive) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setTimeToLive(timeToLive);
+ }
private void checkPreConditions() throws JMSException
{
@@ -130,31 +152,41 @@
private void checkPreConditions(Queue queue) throws JMSException
{
- if (closed){
- throw new javax.jms.IllegalStateException("Publisher is
closed");
- }
-
- AMQSession session = ((BasicMessageProducer)
_delegate).getSession();
-
- if(session == null || session.isClosed()){
- throw new javax.jms.IllegalStateException("Invalid
Session");
- }
+ if (closed)
+ {
+ throw new javax.jms.IllegalStateException("Publisher is closed");
+ }
+
+ AMQSession session = ((BasicMessageProducer) _delegate).getSession();
+
+ if (session == null || session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
- if(!(queue instanceof AMQDestination))
+ if (!(queue instanceof AMQDestination))
{
throw new InvalidDestinationException("Queue: " + queue + " is not
a valid Qpid queue");
}
AMQDestination destination = (AMQDestination) queue;
- if(!destination.isValidated() && checkQueueBeforePublish())
+ if (!destination.isValidated() && checkQueueBeforePublish())
{
- if (_delegate.isBound(destination))
+ if (_delegate.getSession().isStrictAMQP())
{
+ _delegate._logger.warn("AMQP does not support destination
validation before publish, ");
destination.setValidated(true);
}
else
{
- throw new InvalidDestinationException("Queue: " + queue + " is
not a valid destination (no bindings on server");
+ if (_delegate.isBound(destination))
+ {
+ destination.setValidated(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + queue +
" is not a valid destination (no bindings on server");
+ }
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Tue May 1 00:17:43 2007
@@ -490,27 +490,7 @@
new
SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
}
- /**
- * Convenience method to register an AMQSession with the protocol handler.
Registering a session with the protocol
- * handler will ensure that messages are delivered to the consumer(s) on
that session.
- *
- * @param channelId the channel id of the session
- * @param session the session instance.
- */
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- _protocolSession.addSessionByChannel(channelId, session);
- }
- /**
- * Convenience method to deregister an AMQSession with the protocol
handler.
- *
- * @param channelId then channel id of the session
- */
- public void removeSessionByChannel(int channelId)
- {
- _protocolSession.removeSessionByChannel(channelId);
- }
public void closeSession(AMQSession session) throws AMQException
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Tue May 1 00:17:43 2007
@@ -85,7 +85,7 @@
protected final AMQProtocolHandler _protocolHandler;
/** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+ protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new
ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -104,26 +104,13 @@
private VersionSpecificRegistry _registry =
MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
- /**
- * No-arg constructor for use by test subclass - has to initialise final
vars NOT intended for use other then for
- * test
- */
- public AMQProtocolSession()
- {
- _protocolHandler = null;
- _minaProtocolSession = null;
- _stateManager = new AMQStateManager(this);
- }
+ private final AMQConnection _connection;
+
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession
protocolSession, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- // properties of the connection are made available to the event
handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _stateManager = new AMQStateManager(this);
+ this(protocolHandler,protocolSession,connection, new
AMQStateManager());
+
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession
protocolSession, AMQConnection connection,
@@ -138,6 +125,7 @@
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
+ _connection = connection;
}
@@ -305,11 +293,16 @@
*/
private void deliverMessageToAMQSession(int channelId, UnprocessedMessage
msg)
{
- AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+ AMQSession session = getSession(channelId);
session.messageReceived(msg);
_channelId2UnprocessedMsgMap.remove(channelId);
}
+ protected AMQSession getSession(int channelId)
+ {
+ return _connection.getSession(channelId);
+ }
+
/**
* Convenience method that writes a frame to the protocol session.
Equivalent to calling
* getProtocolSession().write().
@@ -335,32 +328,6 @@
}
}
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to register a session
with a channel id <= zero");
- }
-
- if (session == null)
- {
- throw new IllegalArgumentException("Attempt to register a null
session");
- }
-
- _logger.debug("Add session with channel id " + channelId);
- _channelId2SessionMap.put(channelId, session);
- }
-
- public void removeSessionByChannel(int channelId)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to deregister a
session with a channel id <= zero");
- }
-
- _logger.debug("Removing session with channelId " + channelId);
- _channelId2SessionMap.remove(channelId);
- }
/**
* Starts the process of closing a session
@@ -393,11 +360,11 @@
*/
public boolean channelClosed(int channelId, AMQConstant code, String text)
throws AMQException
{
- final Integer chId = channelId;
+
// if this is not a response to an earlier request to close the channel
- if (_closingChannels.remove(chId) == null)
+ if (_closingChannels.remove(channelId) == null)
{
- final AMQSession session = (AMQSession)
_channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
try
{
session.closed(new AMQException(code, text));
@@ -469,8 +436,7 @@
public void confirmConsumerCancelled(int channelId, AMQShortString
consumerTag)
{
- final Integer chId = channelId;
- final AMQSession session = (AMQSession)
_channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
session.confirmConsumerCancelled(consumerTag);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Tue May 1 00:17:43 2007
@@ -32,9 +32,6 @@
{
private static class AMQProtSession extends AMQProtocolSession
{
- public AMQProtSession()
- {
- }
public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession
protocolSession, AMQConnection connection)
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Tue May 1 00:17:43 2007
@@ -390,7 +390,6 @@
assertEquals("1", ((TextMessage) result).getText());
assertTrue("Messasge is marked as redelivered" + result,
!result.getJMSRedelivered());
-
_logger.info("Closing Consumer");
_consumer.close();
@@ -398,30 +397,31 @@
_consumer = _session.createConsumer(_jmsQueue);
_logger.info("receiving result");
+
+ // NOTE: Both msg 1 & 2 will be marked as redelivered as they have
both will have been rejected.
+ // Only the occasion where it is not rejected will it mean it hasn't
arrived at the client yet.
result = _consumer.receive(1000);
assertNotNull("test message was consumed and rolled back, but is
gone", result);
+
+ // The first message back will be either 1 or 2 being redelivered
if (result.getJMSRedelivered())
{
- assertEquals("1", ((TextMessage) result).getText());
-
- result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is
gone", result);
- assertEquals("2", ((TextMessage) result).getText());
assertTrue("Messasge is not marked as redelivered" + result,
result.getJMSRedelivered());
}
- else
+ else // or it will be msg 2 arriving the first time due to latency.
{
- assertEquals("2", ((TextMessage) result).getText());
- assertTrue("Messasge is marked as redelivered" + result,
!result.getJMSRedelivered());
+ _logger.info("Message 2 wasn't prefetched so wasn't rejected");
+ assertEquals("2", ((TextMessage) result).getText());
+ }
- result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is
gone", result);
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered" + result,
result.getJMSRedelivered());
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is
gone", result);
+ assertTrue("Messasge is not marked as redelivered" + result,
result.getJMSRedelivered());
- }
result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);
+
+ _session.commit();
}