Author: ritchiem
Date: Tue May  1 00:17:43 2007
New Revision: 533957

URL: http://svn.apache.org/viewvc?view=rev&rev=533957
Log:
Merged revisions 
532766-532785,532788-532790,532792-533064,533066-533074,533076,533080-533130,533132-533139,533142-533703,533705-533765
 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r532766 | rgreig | 2007-04-26 15:57:04 +0100 (Thu, 26 Apr 2007) | 1 line
  
  Rationlized the performance tests.
........
  r532794 | rgreig | 2007-04-26 17:33:10 +0100 (Thu, 26 Apr 2007) | 1 line
  
  Rationalized the performance tests.
........
  r533721 | rgodfrey | 2007-04-30 13:24:41 +0100 (Mon, 30 Apr 2007) | 1 line
  
  QPID-476 : Remove duplicate map of channelId to session
........
  r533764 | ritchiem | 2007-04-30 15:37:23 +0100 (Mon, 30 Apr 2007) | 4 lines
  
  QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java 
client. 
  
  Updated to allow the use of durable subscriptions but it will not be as clean 
as with the extensions.
  Selectors are also now disabled.
........
  r533765 | ritchiem | 2007-04-30 15:39:18 +0100 (Mon, 30 Apr 2007) | 1 line
  
  QPID-461 Update to CommitRollbackTest. Ensuring messages received have the 
correct redelivered value, regardless of order.
........

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Tue May  1 00:17:43 2007
@@ -96,7 +96,8 @@
     private AMQProtocolHandler _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
-    private final Map _sessions = new LinkedHashMap(); // fixme this is map is 
replicated in amqprotocolsession as _channelId2SessionMap
+    private final Map<Integer,AMQSession> _sessions = new 
LinkedHashMap<Integer,AMQSession>();
+
 
     private String _clientName;
 
@@ -508,7 +509,7 @@
                             AMQSession session =
                                 new AMQSession(AMQConnection.this, channelId, 
transacted, acknowledgeMode, prefetchHigh,
                                     prefetchLow);
-                            _protocolHandler.addSessionByChannel(channelId, 
session);
+                            //_protocolHandler.addSessionByChannel(channelId, 
session);
                             registerSession(channelId, session);
 
                             boolean success = false;
@@ -527,7 +528,6 @@
                             {
                                 if (!success)
                                 {
-                                    
_protocolHandler.removeSessionByChannel(channelId);
                                     deregisterSession(channelId);
                                 }
                             }
@@ -589,7 +589,6 @@
         }
         catch (AMQException e)
         {
-            _protocolHandler.removeSessionByChannel(channelId);
             deregisterSession(channelId);
             throw new AMQException("Error reopening channel " + channelId + " 
after failover: " + e, e);
         }
@@ -1136,7 +1135,7 @@
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {
             AMQSession s = (AMQSession) it.next();
-            _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
             reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), 
s.getDefaultPrefetchLow(), s.getTransacted());
             s.resubscribe();
         }
@@ -1222,5 +1221,11 @@
     public void performConnectionTask(Runnable task)
     {
         _taskPool.execute(task);
+    }
+
+
+    public AMQSession getSession(int channelId)
+    {
+        return _sessions.get(channelId);
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue May  1 00:17:43 2007
@@ -209,6 +209,12 @@
 
     private final boolean _strictAMQP;
 
+    /** System property to enable strickt AMQP compliance */
+    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+    /** Strickt AMQP default */
+    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+    private final boolean _strictAMQPFATAL;
 
     /** System property to enable immediate message prefetching */
     public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
@@ -429,23 +435,14 @@
         }
     }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
-               MessageFactoryRegistry messageFactoryRegistry)
-    {
-        this(con, channelId, transacted, acknowledgeMode, 
messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
-    }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
-               MessageFactoryRegistry messageFactoryRegistry, int 
defaultPrefetch)
-    {
-        this(con, channelId, transacted, acknowledgeMode, 
messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
-    }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int 
defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
         _strictAMQP = 
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, 
STRICT_AMQP_DEFAULT));
+        _strictAMQPFATAL = 
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, 
STRICT_AMQP_FATAL_DEFAULT));
         _immediatePrefetch = 
Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, 
IMMEDIATE_PREFETCH_DEFAULT));
 
         _connection = con;
@@ -493,15 +490,7 @@
         }
     }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode)
-    {
-        this(con, channelId, transacted, acknowledgeMode, 
MessageFactoryRegistry.newDefaultRegistry());
-    }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode, int defaultPrefetch)
-    {
-        this(con, channelId, transacted, acknowledgeMode, 
MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
-    }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
     {
@@ -796,7 +785,7 @@
                 amqe = new AMQException("Closing session forcibly", e);
             }
             _connection.deregisterSession(_channelId);
-            closeProducersAndConsumers(amqe);
+            closeProducersAndConsumers(amqe);            
         }
     }
 
@@ -809,6 +798,7 @@
         _closed.set(true);
         _connection.deregisterSession(_channelId);
         markClosedProducersAndConsumers();
+
     }
 
     private void markClosedProducersAndConsumers()
@@ -941,7 +931,7 @@
                                                                                
             getProtocolMajorVersion(),
                                                                                
             getProtocolMinorVersion(),
                                                                                
             false));    // requeue
-                _logger.warn("Session Recover cannot be guaranteed with 
STRICT_AMQP. Messages may arrive out of order.");                
+                _logger.warn("Session Recover cannot be guaranteed with 
STRICT_AMQP. Messages may arrive out of order.");
             }
             else
             {
@@ -1229,13 +1219,30 @@
                                                  final int prefetchLow,
                                                  final boolean noLocal,
                                                  final boolean exclusive,
-                                                 final String selector,
+                                                 String selector,
                                                  final FieldTable rawSelector,
                                                  final boolean noConsume,
                                                  final boolean autoClose) 
throws JMSException
     {
         checkTemporaryDestination(destination);
 
+        final String messageSelector;
+
+        if (_strictAMQP && !(selector == null || selector.equals("")))
+        {
+            if (_strictAMQPFATAL)
+            {
+                throw new UnsupportedOperationException("Selectors not 
currently supported by AMQP.");
+            }
+            else
+            {
+                messageSelector = null;
+            }
+        }
+        else
+        {
+            messageSelector = selector;
+        }
 
         return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
         {
@@ -1246,6 +1253,7 @@
                 AMQDestination amqd = (AMQDestination) destination;
 
                 final AMQProtocolHandler protocolHandler = 
getProtocolHandler();
+                // TODO: Define selectors in AMQP
                 // TODO: construct the rawSelector from the selector string if 
rawSelector == null
                 final FieldTable ft = FieldTableFactory.newFieldTable();
                 //if (rawSelector != null)
@@ -1254,7 +1262,8 @@
                 {
                     ft.addAll(rawSelector);
                 }
-                BasicMessageConsumer consumer = new 
BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
+
+                BasicMessageConsumer consumer = new 
BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
                                                                          
_messageFactoryRegistry, AMQSession.this,
                                                                          
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
                                                                          
_acknowledgeMode, noConsume, autoClose);
@@ -1647,6 +1656,8 @@
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
     {
+
+
         checkNotClosed();
         AMQTopic origTopic = checkValidTopic(topic);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, 
_connection);
@@ -1674,13 +1685,31 @@
             {
                 topicName = new AMQShortString(topic.getTopicName());
             }
-            // if the queue is bound to the exchange but NOT for this topic, 
then the JMS spec
-            // says we must trash the subscription.
-            if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
-                !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), 
topicName))
+
+            if (_strictAMQP)
             {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not 
currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already 
exists for '" + topicName + "' "
+                                 + "for creation durableSubscriber. Requesting 
queue deletion regardless.");
+                }
+
                 deleteQueue(dest.getAMQQueueName());
             }
+            else
+            {
+                // if the queue is bound to the exchange but NOT for this 
topic, then the JMS spec
+                // says we must trash the subscription.
+                if (isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName()) &&
+                    !isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName(), topicName))
+                {
+                    deleteQueue(dest.getAMQQueueName());
+                }
+            }
         }
 
         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) 
createConsumer(dest));
@@ -1778,13 +1807,31 @@
         }
         else
         {
-            if (isQueueBound(getDefaultTopicExchangeName(), 
AMQTopic.getDurableTopicQueueName(name, _connection)))
+            if (_strictAMQP)
             {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not 
currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already 
exists for '" + name + "' for unsubscribe."
+                                 + " Requesting queue deletion regardless.");
+                }
+
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, 
_connection));
             }
             else
             {
-                throw new InvalidDestinationException("Unknown subscription 
exchange:" + name);
+
+                if (isQueueBound(getDefaultTopicExchangeName(), 
AMQTopic.getDurableTopicQueueName(name, _connection)))
+                {
+                    deleteQueue(AMQTopic.getDurableTopicQueueName(name, 
_connection));
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Unknown 
subscription exchange:" + name);
+                }
             }
         }
     }
@@ -1796,10 +1843,6 @@
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString 
queueName, AMQShortString routingKey) throws JMSException
     {
-        if (isStrictAMQP())
-        {
-            throw new UnsupportedOperationException();
-        }
 
         // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
 Tue May  1 00:17:43 2007
@@ -9,119 +9,141 @@
 import javax.jms.QueueSender;
 import javax.jms.InvalidDestinationException;
 
-public class QueueSenderAdapter implements QueueSender {
+public class QueueSenderAdapter implements QueueSender
+{
 
-       private BasicMessageProducer _delegate;
-       private Queue _queue;
-       private boolean closed = false;
-       
-       public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue 
queue){
-               _delegate = msgProducer;
-               _queue = queue;
-       }
-       
-       public Queue getQueue() throws JMSException {
-               checkPreConditions();
-               return _queue;
-       }
-
-       public void send(Message msg) throws JMSException {
-               checkPreConditions();
-               _delegate.send(msg);
-       }
-
-       public void send(Queue queue, Message msg) throws JMSException {
-               checkPreConditions(queue);
-               _delegate.send(queue, msg);
-       }
-
-       public void publish(Message msg, int deliveryMode, int priority, long 
timeToLive)
-       throws JMSException {
-               checkPreConditions();
-               _delegate.send(msg, deliveryMode,priority,timeToLive);
-       }
-
-       public void send(Queue queue,Message msg, int deliveryMode, int 
priority, long timeToLive)
-                       throws JMSException {
-               checkPreConditions(queue);
-               _delegate.send(queue,msg, deliveryMode,priority,timeToLive);
-       }
-       
-       public void close() throws JMSException {
-               _delegate.close();
-               closed = true;
-       }
-
-       public int getDeliveryMode() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDeliveryMode();
-       }
-
-       public Destination getDestination() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDestination();
-       }
-
-       public boolean getDisableMessageID() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDisableMessageID();
-       }
-
-       public boolean getDisableMessageTimestamp() throws JMSException {
-               checkPreConditions();
-               return _delegate.getDisableMessageTimestamp();
-       }
-
-       public int getPriority() throws JMSException {
-               checkPreConditions();
-               return _delegate.getPriority();
-       }
-
-       public long getTimeToLive() throws JMSException {
-               checkPreConditions();
-               return _delegate.getTimeToLive();
-       }
-
-       public void send(Destination dest, Message msg) throws JMSException {
-               checkPreConditions((Queue)dest);
-               _delegate.send(dest,msg);
-       }
-
-       public void send(Message msg, int deliveryMode, int priority, long 
timeToLive)
-                       throws JMSException {
-               checkPreConditions();
-               _delegate.send(msg, deliveryMode,priority,timeToLive);
-       }
-
-       public void send(Destination dest, Message msg, int deliveryMode, int 
priority, long timeToLive) throws JMSException {
-               checkPreConditions((Queue)dest);
-               _delegate.send(dest,msg, deliveryMode,priority,timeToLive);
-       }
-
-       public void setDeliveryMode(int deliveryMode) throws JMSException {
-               checkPreConditions();
-               _delegate.setDeliveryMode(deliveryMode);
-       }
-
-       public void setDisableMessageID(boolean disableMessageID) throws 
JMSException {
-               checkPreConditions();
-               _delegate.setDisableMessageID(disableMessageID);
-       }
-
-       public void setDisableMessageTimestamp(boolean disableMessageTimestamp) 
throws JMSException {
-               checkPreConditions();
-               _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
-       }
-
-       public void setPriority(int priority) throws JMSException {
-               checkPreConditions();
-               _delegate.setPriority(priority);
-       }
-
-       public void setTimeToLive(long timeToLive) throws JMSException {
-               checkPreConditions();
-               _delegate.setTimeToLive(timeToLive);
-       }
+    private BasicMessageProducer _delegate;
+    private Queue _queue;
+    private boolean closed = false;
+
+    public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue)
+    {
+        _delegate = msgProducer;
+        _queue = queue;
+    }
+
+    public Queue getQueue() throws JMSException
+    {
+        checkPreConditions();
+        return _queue;
+    }
+
+    public void send(Message msg) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg);
+    }
+
+    public void send(Queue queue, Message msg) throws JMSException
+    {
+        checkPreConditions(queue);
+        _delegate.send(queue, msg);
+    }
+
+    public void publish(Message msg, int deliveryMode, int priority, long 
timeToLive)
+            throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void send(Queue queue, Message msg, int deliveryMode, int priority, 
long timeToLive)
+            throws JMSException
+    {
+        checkPreConditions(queue);
+        _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void close() throws JMSException
+    {
+        _delegate.close();
+        closed = true;
+    }
+
+    public int getDeliveryMode() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDeliveryMode();
+    }
+
+    public Destination getDestination() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDestination();
+    }
+
+    public boolean getDisableMessageID() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDisableMessageID();
+    }
+
+    public boolean getDisableMessageTimestamp() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDisableMessageTimestamp();
+    }
+
+    public int getPriority() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getPriority();
+    }
+
+    public long getTimeToLive() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getTimeToLive();
+    }
+
+    public void send(Destination dest, Message msg) throws JMSException
+    {
+        checkPreConditions((Queue) dest);
+        _delegate.send(dest, msg);
+    }
+
+    public void send(Message msg, int deliveryMode, int priority, long 
timeToLive)
+            throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void send(Destination dest, Message msg, int deliveryMode, int 
priority, long timeToLive) throws JMSException
+    {
+        checkPreConditions((Queue) dest);
+        _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void setDeliveryMode(int deliveryMode) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setDeliveryMode(deliveryMode);
+    }
+
+    public void setDisableMessageID(boolean disableMessageID) throws 
JMSException
+    {
+        checkPreConditions();
+        _delegate.setDisableMessageID(disableMessageID);
+    }
+
+    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) 
throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+    }
+
+    public void setPriority(int priority) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setPriority(priority);
+    }
+
+    public void setTimeToLive(long timeToLive) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setTimeToLive(timeToLive);
+    }
 
     private void checkPreConditions() throws JMSException
     {
@@ -130,31 +152,41 @@
 
     private void checkPreConditions(Queue queue) throws JMSException
     {
-               if (closed){
-                       throw new javax.jms.IllegalStateException("Publisher is 
closed");
-               }
-               
-               AMQSession session = ((BasicMessageProducer) 
_delegate).getSession();
-               
-               if(session == null || session.isClosed()){
-                       throw new javax.jms.IllegalStateException("Invalid 
Session");
-               }
+        if (closed)
+        {
+            throw new javax.jms.IllegalStateException("Publisher is closed");
+        }
+
+        AMQSession session = ((BasicMessageProducer) _delegate).getSession();
+
+        if (session == null || session.isClosed())
+        {
+            throw new javax.jms.IllegalStateException("Invalid Session");
+        }
 
-        if(!(queue instanceof AMQDestination))
+        if (!(queue instanceof AMQDestination))
         {
             throw new InvalidDestinationException("Queue: " + queue + " is not 
a valid Qpid queue");
         }
         AMQDestination destination = (AMQDestination) queue;
-        if(!destination.isValidated() && checkQueueBeforePublish())
+        if (!destination.isValidated() && checkQueueBeforePublish())
         {
 
-            if (_delegate.isBound(destination))
+            if (_delegate.getSession().isStrictAMQP())
             {
+                _delegate._logger.warn("AMQP does not support destination 
validation before publish, ");
                 destination.setValidated(true);
             }
             else
             {
-                throw new InvalidDestinationException("Queue: " + queue + " is 
not a valid destination (no bindings on server");
+                if (_delegate.isBound(destination))
+                {
+                    destination.setValidated(true);
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Queue: " + queue + 
" is not a valid destination (no bindings on server");
+                }
             }
         }
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Tue May  1 00:17:43 2007
@@ -490,27 +490,7 @@
                                                 new 
SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
     }
 
-    /**
-     * Convenience method to register an AMQSession with the protocol handler. 
Registering a session with the protocol
-     * handler will ensure that messages are delivered to the consumer(s) on 
that session.
-     *
-     * @param channelId the channel id of the session
-     * @param session   the session instance.
-     */
-    public void addSessionByChannel(int channelId, AMQSession session)
-    {
-        _protocolSession.addSessionByChannel(channelId, session);
-    }
 
-    /**
-     * Convenience method to deregister an AMQSession with the protocol 
handler.
-     *
-     * @param channelId then channel id of the session
-     */
-    public void removeSessionByChannel(int channelId)
-    {
-        _protocolSession.removeSessionByChannel(channelId);
-    }
 
     public void closeSession(AMQSession session) throws AMQException
     {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 Tue May  1 00:17:43 2007
@@ -85,7 +85,7 @@
     protected final AMQProtocolHandler _protocolHandler;
 
     /** Maps from the channel id to the AMQSession that it represents. */
-    protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+    protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new 
ConcurrentHashMap<Integer, AMQSession>();
 
     protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
 
@@ -104,26 +104,13 @@
     private VersionSpecificRegistry _registry =
         
MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
 
-    /**
-     * No-arg constructor for use by test subclass - has to initialise final 
vars NOT intended for use other then for
-     * test
-     */
-    public AMQProtocolSession()
-    {
-        _protocolHandler = null;
-        _minaProtocolSession = null;
-        _stateManager = new AMQStateManager(this);
-    }
+    private final AMQConnection _connection;
+
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession 
protocolSession, AMQConnection connection)
     {
-        _protocolHandler = protocolHandler;
-        _minaProtocolSession = protocolSession;
-        // properties of the connection are made available to the event 
handlers
-        _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
-        // fixme - real value needed
-        _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        _stateManager = new AMQStateManager(this);
+        this(protocolHandler,protocolSession,connection, new 
AMQStateManager());
+
     }
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession 
protocolSession, AMQConnection connection,
@@ -138,6 +125,7 @@
         _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         _stateManager = stateManager;
         _stateManager.setProtocolSession(this);
+        _connection = connection;
 
     }
 
@@ -305,11 +293,16 @@
      */
     private void deliverMessageToAMQSession(int channelId, UnprocessedMessage 
msg)
     {
-        AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+        AMQSession session = getSession(channelId);
         session.messageReceived(msg);
         _channelId2UnprocessedMsgMap.remove(channelId);
     }
 
+    protected AMQSession getSession(int channelId)
+    {
+        return _connection.getSession(channelId);
+    }
+
     /**
      * Convenience method that writes a frame to the protocol session. 
Equivalent to calling
      * getProtocolSession().write().
@@ -335,32 +328,6 @@
         }
     }
 
-    public void addSessionByChannel(int channelId, AMQSession session)
-    {
-        if (channelId <= 0)
-        {
-            throw new IllegalArgumentException("Attempt to register a session 
with a channel id <= zero");
-        }
-
-        if (session == null)
-        {
-            throw new IllegalArgumentException("Attempt to register a null 
session");
-        }
-
-        _logger.debug("Add session with channel id  " + channelId);
-        _channelId2SessionMap.put(channelId, session);
-    }
-
-    public void removeSessionByChannel(int channelId)
-    {
-        if (channelId <= 0)
-        {
-            throw new IllegalArgumentException("Attempt to deregister a 
session with a channel id <= zero");
-        }
-
-        _logger.debug("Removing session with channelId " + channelId);
-        _channelId2SessionMap.remove(channelId);
-    }
 
     /**
      * Starts the process of closing a session
@@ -393,11 +360,11 @@
      */
     public boolean channelClosed(int channelId, AMQConstant code, String text) 
throws AMQException
     {
-        final Integer chId = channelId;
+
         // if this is not a response to an earlier request to close the channel
-        if (_closingChannels.remove(chId) == null)
+        if (_closingChannels.remove(channelId) == null)
         {
-            final AMQSession session = (AMQSession) 
_channelId2SessionMap.get(chId);
+            final AMQSession session = getSession(channelId);
             try
             {
                 session.closed(new AMQException(code, text));
@@ -469,8 +436,7 @@
 
     public void confirmConsumerCancelled(int channelId, AMQShortString 
consumerTag)
     {
-        final Integer chId = channelId;
-        final AMQSession session = (AMQSession) 
_channelId2SessionMap.get(chId);
+        final AMQSession session = getSession(channelId);
 
         session.confirmConsumerCancelled(consumerTag);
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
 Tue May  1 00:17:43 2007
@@ -32,9 +32,6 @@
 {
     private static class AMQProtSession extends AMQProtocolSession
     {
-        public AMQProtSession()
-        {
-        }
 
         public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession 
protocolSession, AMQConnection connection)
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=533957&r1=533956&r2=533957
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
 Tue May  1 00:17:43 2007
@@ -390,7 +390,6 @@
         assertEquals("1", ((TextMessage) result).getText());
         assertTrue("Messasge is marked as redelivered" + result, 
!result.getJMSRedelivered());
 
-
         _logger.info("Closing Consumer");
         _consumer.close();
 
@@ -398,30 +397,31 @@
         _consumer = _session.createConsumer(_jmsQueue);
 
         _logger.info("receiving result");
+
+        // NOTE: Both msg 1 & 2 will be marked as redelivered as they have 
both will have been rejected.
+        // Only the occasion where it is not rejected will it mean it hasn't 
arrived at the client yet.
         result = _consumer.receive(1000);
         assertNotNull("test message was consumed and rolled back, but is 
gone", result);
+
+        // The first message back will be either 1 or 2 being redelivered
         if (result.getJMSRedelivered())
         {
-            assertEquals("1", ((TextMessage) result).getText());
-
-            result = _consumer.receive(1000);
-            assertNotNull("test message was consumed and rolled back, but is 
gone", result);
-            assertEquals("2", ((TextMessage) result).getText());
             assertTrue("Messasge is not marked as redelivered" + result, 
result.getJMSRedelivered());
         }
-        else
+        else // or it will be msg 2 arriving the first time due to latency.  
         {
-            assertEquals("2", ((TextMessage) result).getText());
-            assertTrue("Messasge is marked as redelivered" + result, 
!result.getJMSRedelivered());
+            _logger.info("Message 2 wasn't prefetched so wasn't rejected");
+            assertEquals("2", ((TextMessage) result).getText());            
+        }
 
-            result = _consumer.receive(1000);
-            assertNotNull("test message was consumed and rolled back, but is 
gone", result);
-            assertEquals("1", ((TextMessage) result).getText());
-            assertTrue("Messasge is not marked as redelivered" + result, 
result.getJMSRedelivered());
+        result = _consumer.receive(1000);
+        assertNotNull("test message was consumed and rolled back, but is 
gone", result);
+        assertTrue("Messasge is not marked as redelivered" + result, 
result.getJMSRedelivered());
 
-        }
         result = _consumer.receive(1000);
         assertNull("test message should be null:" + result, result);
+
+        _session.commit();
 
     }
 


Reply via email to