Author: ritchiem
Date: Tue Nov  6 03:12:37 2007
New Revision: 592374

URL: http://svn.apache.org/viewvc?rev=592374&view=rev
Log:
QPID-662 Transactional state not correctly reported after fail over. We now 
record if we have sent any messages
from here we can check if we have failed over and so have lost messages from 
the transaction making it invalid.

Added:
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
   (with props)
Modified:
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Nov  6 03:12:37 2007
@@ -20,11 +20,11 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
 import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -100,6 +100,7 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -293,6 +294,11 @@
     private final boolean _strictAMQPFATAL;
     private final Object _messageDeliveryLock = new Object();
 
+    /** Session state : used to detect if commit is a) required b) allowed , 
i.e. does the tx span failover. */
+    private boolean _dirty;
+    /** Has failover occured on this session */
+    private boolean _failedOver;
+
     /**
      * Creates a new session on a connection.
      *
@@ -610,30 +616,65 @@
     {
         checkTransacted();
 
-        try
+        new FailoverNoopSupport<Object, JMSException>(new 
FailoverProtectedOperation<Object, JMSException>()
         {
-            // Acknowledge up to message last delivered (if any) for each 
consumer.
-            // need to send ack for messages delivered to consumers so far
-            for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
+            public Object execute() throws JMSException, FailoverException
             {
-                // Sends acknowledgement to server
-                i.next().acknowledgeLastDelivered();
-            }
+                //Check that we are clean to commit.
+                if (_failedOver && _dirty)
+                {
+                    rollback();
 
-            // Commits outstanding messages sent and outstanding 
acknowledgements.
-            final AMQProtocolHandler handler = getProtocolHandler();
+                    throw new TransactionRolledBackException("Connection 
failover has occured since last send. " +
+                                                             "Forced 
rollback");
+                }
 
-            handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, 
getProtocolMajorVersion(), getProtocolMinorVersion()),
-                              TxCommitOkBody.class);
-        }
-        catch (AMQException e)
-        {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage(), 
e);
-        }
-        catch (FailoverException e)
-        {
-            throw new JMSAMQException("Fail-over interrupted commit. Status of 
the commit is uncertain.", e);
-        }
+                try
+                {
+                    // Acknowledge up to message last delivered (if any) on 
this session.
+                    // We only need to find the highest value and ack that as 
commit is session level.
+                    Long lastTag = -1L;
+
+                    for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
+                    {
+//                        i.next().acknowledgeLastDelivered();
+//                    }
+
+                        // get next acknowledgement to server
+                        Long next = i.next().getLastDelivered();
+                        if (next != null && next > lastTag)
+                        {
+                            lastTag = next;
+                        }
+                    }
+
+                    if (lastTag != -1)
+                    {
+                        acknowledgeMessage(lastTag, true);
+                    }
+
+                    // Commits outstanding messages sent and outstanding 
acknowledgements.
+                    final AMQProtocolHandler handler = getProtocolHandler();
+
+                    handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, 
getProtocolMajorVersion(), getProtocolMinorVersion()),
+                                      TxCommitOkBody.class);
+
+                    markClean();
+                }
+
+                catch (AMQException e)
+                {
+                    throw new JMSAMQException("Failed to commit: " + 
e.getMessage(), e);
+                }
+
+                catch (FailoverException e)
+                {
+                    throw new JMSAMQException("Fail-over interrupted commit. 
Status of the commit is uncertain.", e);
+                }
+
+                return null;
+            }
+        }, _connection).execute();
     }
 
     public void confirmConsumerCancelled(AMQShortString consumerTag)
@@ -1431,6 +1472,8 @@
                 
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
                                                                                
          getProtocolMajorVersion(), getProtocolMinorVersion()), 
TxRollbackOkBody.class);
 
+                markClean();
+
                 if (!isSuspended)
                 {
                     suspendChannel(false);
@@ -1731,6 +1774,7 @@
      */
     void resubscribe() throws AMQException
     {
+        _failedOver = true;
         resubscribeProducers();
         resubscribeConsumers();
     }
@@ -2530,6 +2574,41 @@
     Object getMessageDeliveryLock()
     {
         return _messageDeliveryLock;
+    }
+
+    /**
+     * Signifies that the session has pending sends to commit.
+     */
+    public void markDirty()
+    {
+        _dirty = true;
+    }
+
+    /**
+     * Signifies that the session has no pending sends to commit.
+     */
+    public void markClean()
+    {
+        _dirty = false;
+        _failedOver = false;
+    }
+
+    /**
+     * Check to see if failover has occured since the last call to 
markClean(commit or rollback).
+     * @return boolean true if failover has occured.
+     */
+    public boolean hasFailedOver()
+    {
+        return _failedOver;
+    }
+
+    /**
+     * Check to see if any message have been sent in this transaction and have 
not been commited.
+     * @return boolean true if a message has been sent but not commited
+     */
+    public boolean isDirty()
+    {
+        return _dirty;
     }
 
     /** Responsible for decoding a message fragment and passing it to the 
appropriate message consumer. */

Added: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java?rev=592374&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
 (added)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
 Tue Nov  6 03:12:37 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQSessionDirtyException represents all failures to send data on a 
transacted session that is
+ * no longer in a state that the client expects. i.e. failover has occured so 
previously sent messages
+ * will not be part of the transaction.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent attempt to perform additional sends on a dirty session.
+ * </table>
+ */
+public class AMQSessionDirtyException extends AMQException
+{
+    public AMQSessionDirtyException(String msg)
+    {
+        super(AMQConstant.RESOURCE_ERROR, msg);
+    }
+}

Propchange: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Nov  6 03:12:37 2007
@@ -754,6 +754,30 @@
         }
     }
 
+    /**
+     * Acknowledge up to last message delivered (if any). Used when commiting.
+     *
+     * @return the lastDeliveryTag to acknowledge
+     */
+    Long getLastDelivered()
+    {
+        if (!_receivedDeliveryTags.isEmpty())
+        {
+            Long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+            while (!_receivedDeliveryTags.isEmpty())
+            {
+                lastDeliveryTag = _receivedDeliveryTags.poll();
+            }
+
+            assert _receivedDeliveryTags.isEmpty();
+
+            return lastDeliveryTag;
+        }
+
+        return null;
+    }
+
     /** Acknowledge up to last message delivered (if any). Used when 
commiting. */
     void acknowledgeLastDelivered()
     {
@@ -772,6 +796,7 @@
         }
     }
 
+
     void notifyError(Throwable cause)
     {
         // synchronized (_closed)
@@ -783,7 +808,7 @@
                 if (_closedStack != null)
                 {
                     _logger.trace(_consumerTag + " notifyError():"
-                        + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
+                                  + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
                     _logger.trace(_consumerTag + " previously" + 
_closedStack.toString());
                 }
                 else
@@ -904,7 +929,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Rejecting the messages(" + 
_receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                              + "for consumer with tag:" + _consumerTag);
             }
 
             Long tag = _receivedDeliveryTags.poll();
@@ -934,7 +959,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Rejecting the messages(" + 
_synchronousQueue.size() + ") in _syncQueue (PRQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                              + "for consumer with tag:" + _consumerTag);
             }
 
             Iterator iterator = _synchronousQueue.iterator();

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 Tue Nov  6 03:12:37 2007
@@ -60,46 +60,30 @@
 
     private AMQConnection _connection;
 
-    /**
-     * If true, messages will not get a timestamp.
-     */
+    /** If true, messages will not get a timestamp. */
     private boolean _disableTimestamps;
 
-    /**
-     * Priority of messages created by this producer.
-     */
+    /** Priority of messages created by this producer. */
     private int _messagePriority;
 
-    /**
-     * Time to live of messages. Specified in milliseconds but AMQ has 1 
second resolution.
-     */
+    /** Time to live of messages. Specified in milliseconds but AMQ has 1 
second resolution. */
     private long _timeToLive;
 
-    /**
-     * Delivery mode used for this producer.
-     */
+    /** Delivery mode used for this producer. */
     private int _deliveryMode = DeliveryMode.PERSISTENT;
 
-    /**
-     * The Destination used for this consumer, if specified upon creation.
-     */
+    /** The Destination used for this consumer, if specified upon creation. */
     protected AMQDestination _destination;
 
-    /**
-     * Default encoding used for messages produced by this producer.
-     */
+    /** Default encoding used for messages produced by this producer. */
     private String _encoding;
 
-    /**
-     * Default encoding used for message produced by this producer.
-     */
+    /** Default encoding used for message produced by this producer. */
     private String _mimeType;
 
     private AMQProtocolHandler _protocolHandler;
 
-    /**
-     * True if this producer was created from a transacted session
-     */
+    /** True if this producer was created from a transacted session */
     private boolean _transacted;
 
     private int _channelId;
@@ -112,9 +96,7 @@
      */
     private long _producerId;
 
-    /**
-     * The session used to create this producer
-     */
+    /** The session used to create this producer */
     private AMQSession _session;
 
     private final boolean _immediate;
@@ -128,8 +110,8 @@
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination 
destination, boolean transacted, int channelId,
-        AMQSession session, AMQProtocolHandler protocolHandler, long 
producerId, boolean immediate, boolean mandatory,
-        boolean waitUntilSent)
+                                   AMQSession session, AMQProtocolHandler 
protocolHandler, long producerId, boolean immediate, boolean mandatory,
+                                   boolean waitUntilSent)
     {
         _connection = connection;
         _destination = destination;
@@ -162,16 +144,16 @@
         // Note that the durable and internal arguments are ignored since 
passive is set to false
         // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame declare =
-            ExchangeDeclareBody.createAMQFrame(_channelId, 
_protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), null, // arguments
-                false, // autoDelete
-                false, // durable
-                destination.getExchangeName(), // exchange
-                false, // internal
-                true, // nowait
-                false, // passive
-                _session.getTicket(), // ticket
-                destination.getExchangeClass()); // type
+                ExchangeDeclareBody.createAMQFrame(_channelId, 
_protocolHandler.getProtocolMajorVersion(),
+                                                   
_protocolHandler.getProtocolMinorVersion(), null, // arguments
+                                                   false, // autoDelete
+                                                   false, // durable
+                                                   
destination.getExchangeName(), // exchange
+                                                   false, // internal
+                                                   true, // nowait
+                                                   false, // passive
+                                                   _session.getTicket(), // 
ticket
+                                                   
destination.getExchangeClass()); // type
         _protocolHandler.writeFrame(declare);
     }
 
@@ -208,7 +190,7 @@
         if ((i != DeliveryMode.NON_PERSISTENT) && (i != 
DeliveryMode.PERSISTENT))
         {
             throw new JMSException("DeliveryMode must be either NON_PERSISTENT 
or PERSISTENT. Value of " + i
-                + " is illegal");
+                                   + " is illegal");
         }
 
         _deliveryMode = i;
@@ -320,12 +302,12 @@
         {
             validateDestination(destination);
             sendImpl((AMQDestination) destination, message, _deliveryMode, 
_messagePriority, _timeToLive, _mandatory,
-                _immediate);
+                     _immediate);
         }
     }
 
     public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive)
-        throws JMSException
+            throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -337,7 +319,7 @@
     }
 
     public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
-        boolean mandatory) throws JMSException
+                     boolean mandatory) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -349,7 +331,7 @@
     }
 
     public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate) throws JMSException
+                     boolean mandatory, boolean immediate) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -361,7 +343,7 @@
     }
 
     public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate, boolean waitUntilSent) throws 
JMSException
+                     boolean mandatory, boolean immediate, boolean 
waitUntilSent) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -369,7 +351,7 @@
         {
             validateDestination(destination);
             sendImpl((AMQDestination) destination, message, deliveryMode, 
priority, timeToLive, mandatory, immediate,
-                waitUntilSent);
+                     waitUntilSent);
         }
     }
 
@@ -415,7 +397,7 @@
             else
             {
                 throw new JMSException("Unable to send message, due to class 
conversion error: "
-                    + message.getClass().getName());
+                                       + message.getClass().getName());
             }
         }
     }
@@ -425,14 +407,14 @@
         if (!(destination instanceof AMQDestination))
         {
             throw new JMSException("Unsupported destination class: "
-                + ((destination != null) ? destination.getClass() : null));
+                                   + ((destination != null) ? 
destination.getClass() : null));
         }
 
         declareDestination((AMQDestination) destination);
     }
 
     protected void sendImpl(AMQDestination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate) throws JMSException
+                            boolean mandatory, boolean immediate) throws 
JMSException
     {
         sendImpl(destination, message, deliveryMode, priority, timeToLive, 
mandatory, immediate, _waitUntilSent);
     }
@@ -447,16 +429,27 @@
      * @param timeToLive
      * @param mandatory
      * @param immediate
+     *
      * @throws JMSException
      */
     protected void sendImpl(AMQDestination destination, Message origMessage, 
int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate, boolean wait) throws JMSException
+                            boolean mandatory, boolean immediate, boolean 
wait) throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
 
+        if (_transacted)
+        {
+            if (_session.hasFailedOver() && _session.isDirty())
+            {
+                throw new JMSAMQException("Failover has occurred and session 
is dirty so unable to send.",
+                                          new 
AMQSessionDirtyException("Failover has occurred and session is dirty " +
+                                                                       "so 
unable to send."));
+            }
+        }
+
         if (_disableMessageId)
         {
             message.setJMSMessageID(null);
@@ -489,12 +482,12 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQFrame publishFrame =
-            BasicPublishBody.createAMQFrame(_channelId, 
_protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), 
destination.getExchangeName(), // exchange
-                immediate, // immediate
-                mandatory, // mandatory
-                destination.getRoutingKey(), // routingKey
-                _session.getTicket()); // ticket
+                BasicPublishBody.createAMQFrame(_channelId, 
_protocolHandler.getProtocolMajorVersion(),
+                                                
_protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // 
exchange
+                                                immediate, // immediate
+                                                mandatory, // mandatory
+                                                destination.getRoutingKey(), 
// routingKey
+                                                _session.getTicket()); // 
ticket
 
         message.prepareForSending();
         ByteBuffer payload = message.getData();
@@ -536,9 +529,9 @@
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         AMQFrame contentHeaderFrame =
-            ContentHeaderBody.createAMQFrame(_channelId,
-                
BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
-                    _protocolHandler.getProtocolMinorVersion()), 0, 
contentHeaderProperties, size);
+                ContentHeaderBody.createAMQFrame(_channelId,
+                                                 
BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+                                                                           
_protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending content header frame to " + destination);
@@ -558,6 +551,11 @@
             origMessage.setJMSExpiration(message.getJMSExpiration());
             origMessage.setJMSMessageID(message.getJMSMessageID());
         }
+
+        if (_transacted)
+        {
+            _session.markDirty();
+        }
     }
 
     private void checkTemporaryDestination(AMQDestination destination) throws 
JMSException
@@ -669,7 +667,7 @@
         if ((_destination != null) && (suppliedDestination != null))
         {
             throw new UnsupportedOperationException(
-                "This message producer was created with a Destination, 
therefore you cannot use an unidentified Destination");
+                    "This message producer was created with a Destination, 
therefore you cannot use an unidentified Destination");
         }
 
         if (suppliedDestination == null)

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Tue Nov  6 03:12:37 2007
@@ -104,23 +104,22 @@
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Create the filter chain to filter this handlers events.
- *     <td> [EMAIL PROTECTED] ProtocolCodecFilter}, [EMAIL PROTECTED] 
SSLContextFactory}, [EMAIL PROTECTED] SSLFilter}, [EMAIL PROTECTED] 
ReadWriteThreadModel}.
+ * <td> [EMAIL PROTECTED] ProtocolCodecFilter}, [EMAIL PROTECTED] 
SSLContextFactory}, [EMAIL PROTECTED] SSLFilter}, [EMAIL PROTECTED] 
ReadWriteThreadModel}.
  *
  * <tr><td> Maintain fail-over state.
  * <tr><td>
  * </table>
  *
  * @todo Explain the system property: amqj.shared_read_write_pool. How does 
putting the protocol codec filter before the
- *       async write filter make it a shared pool? The pooling filter uses the 
same thread pool for reading and writing
- *       anyway, see [EMAIL PROTECTED] org.apache.qpid.pool.PoolingFilter}, 
docs for comments. Will putting the protocol codec
- *       filter before it mean not doing the read/write asynchronously but in 
the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same 
thread pool for reading and writing
+ * anyway, see [EMAIL PROTECTED] org.apache.qpid.pool.PoolingFilter}, docs for 
comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the 
main filter thread?
  * @todo Use a single handler instance, by shifting everything to do with the 
'protocol session' state, including
- *       failover state, into AMQProtocolSession, and tracking that from 
AMQConnection? The lifecycles of
- *       AMQProtocolSesssion and AMQConnection will be the same, so if there 
is high cohesion between them, they could
- *       be merged, although there is sense in keeping the session model 
seperate. Will clarify things by having data
- *       held per protocol handler, per protocol session, per network 
connection, per channel, in seperate classes, so
- *       that lifecycles of the fields match lifecycles of their containing 
objects.
+ * failover state, into AMQProtocolSession, and tracking that from 
AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high 
cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. 
Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, 
per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
  */
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
@@ -200,7 +199,7 @@
         {
             SSLConfiguration sslConfig = _connection.getSSLConfiguration();
             SSLContextFactory sslFactory =
-                new SSLContextFactory(sslConfig.getKeystorePath(), 
sslConfig.getKeystorePassword(), sslConfig.getCertType());
+                    new SSLContextFactory(sslConfig.getKeystorePath(), 
sslConfig.getKeystorePassword(), sslConfig.getCertType());
             SSLFilter sslFilter = new 
SSLFilter(sslFactory.buildClientContext());
             sslFilter.setUseClientMode(true);
             session.getFilterChain().addBefore("protocolFilter", "ssl", 
sslFilter);
@@ -235,7 +234,7 @@
      * @param session The MINA session.
      *
      * @todo Clarify: presumably exceptionCaught is called when the client is 
sending during a connection failure and
-     *       not otherwise? The above comment doesn't make that clear.
+     * not otherwise? The above comment doesn't make that clear.
      */
     public void sessionClosed(IoSession session)
     {
@@ -413,74 +412,74 @@
 
         switch (bodyFrame.getFrameType())
         {
-        case AMQMethodBody.TYPE:
+            case AMQMethodBody.TYPE:
 
-            if (debug)
-            {
-                _logger.debug("(" + System.identityHashCode(this) + ")Method 
frame received: " + frame);
-            }
+                if (debug)
+                {
+                    _logger.debug("(" + System.identityHashCode(this) + 
")Method frame received: " + frame);
+                }
 
-            final AMQMethodEvent<AMQMethodBody> evt =
-                new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), 
(AMQMethodBody) bodyFrame);
+                final AMQMethodEvent<AMQMethodBody> evt =
+                        new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), 
(AMQMethodBody) bodyFrame);
 
-            try
-            {
-
-                boolean wasAnyoneInterested = 
getStateManager().methodReceived(evt);
-                if (!_frameListeners.isEmpty())
+                try
                 {
-                    Iterator it = _frameListeners.iterator();
-                    while (it.hasNext())
+
+                    boolean wasAnyoneInterested = 
getStateManager().methodReceived(evt);
+                    if (!_frameListeners.isEmpty())
                     {
-                        final AMQMethodListener listener = (AMQMethodListener) 
it.next();
-                        wasAnyoneInterested = listener.methodReceived(evt) || 
wasAnyoneInterested;
+                        Iterator it = _frameListeners.iterator();
+                        while (it.hasNext())
+                        {
+                            final AMQMethodListener listener = 
(AMQMethodListener) it.next();
+                            wasAnyoneInterested = listener.methodReceived(evt) 
|| wasAnyoneInterested;
+                        }
                     }
-                }
 
-                if (!wasAnyoneInterested)
-                {
-                    throw new AMQException("AMQMethodEvent " + evt + " was not 
processed by any listener.  Listeners:"
-                        + _frameListeners);
+                    if (!wasAnyoneInterested)
+                    {
+                        throw new AMQException("AMQMethodEvent " + evt + " was 
not processed by any listener.  Listeners:"
+                                               + _frameListeners);
+                    }
                 }
-            }
-            catch (AMQException e)
-            {
-                getStateManager().error(e);
-                if (!_frameListeners.isEmpty())
+                catch (AMQException e)
                 {
-                    Iterator it = _frameListeners.iterator();
-                    while (it.hasNext())
+                    getStateManager().error(e);
+                    if (!_frameListeners.isEmpty())
                     {
-                        final AMQMethodListener listener = (AMQMethodListener) 
it.next();
-                        listener.error(e);
+                        Iterator it = _frameListeners.iterator();
+                        while (it.hasNext())
+                        {
+                            final AMQMethodListener listener = 
(AMQMethodListener) it.next();
+                            listener.error(e);
+                        }
                     }
-                }
 
-                exceptionCaught(session, e);
-            }
+                    exceptionCaught(session, e);
+                }
 
-            break;
+                break;
 
-        case ContentHeaderBody.TYPE:
+            case ContentHeaderBody.TYPE:
 
-            _protocolSession.messageContentHeaderReceived(frame.getChannel(), 
(ContentHeaderBody) bodyFrame);
-            break;
+                
_protocolSession.messageContentHeaderReceived(frame.getChannel(), 
(ContentHeaderBody) bodyFrame);
+                break;
 
-        case ContentBody.TYPE:
+            case ContentBody.TYPE:
 
-            _protocolSession.messageContentBodyReceived(frame.getChannel(), 
(ContentBody) bodyFrame);
-            break;
+                
_protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) 
bodyFrame);
+                break;
 
-        case HeartbeatBody.TYPE:
+            case HeartbeatBody.TYPE:
 
-            if (debug)
-            {
-                _logger.debug("Received heartbeat");
-            }
+                if (debug)
+                {
+                    _logger.debug("Received heartbeat");
+                }
 
-            break;
+                break;
 
-        default:
+            default:
 
         }
 
@@ -491,6 +490,8 @@
 
     public void messageSent(IoSession session, Object message) throws Exception
     {
+//        System.err.println("Sent PS:" + 
System.identityHashCode(_protocolSession) + ":" + message);
+
         final long sentMessages = _messagesOut++;
 
         final boolean debug = _logger.isDebugEnabled();
@@ -547,7 +548,7 @@
      * @param listener the blocking listener. Note the calling thread will 
block.
      */
     public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, 
BlockingMethodFrameListener listener)
-        throws AMQException, FailoverException
+            throws AMQException, FailoverException
     {
         return writeCommandFrameAndWaitForReply(frame, listener, 
DEFAULT_SYNC_TIMEOUT);
     }
@@ -560,7 +561,7 @@
      * @param listener the blocking listener. Note the calling thread will 
block.
      */
     public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, 
BlockingMethodFrameListener listener,
-        long timeout) throws AMQException, FailoverException
+                                                           long timeout) 
throws AMQException, FailoverException
     {
         try
         {
@@ -570,8 +571,8 @@
             AMQMethodEvent e = listener.blockForFrame(timeout);
 
             return e;
-                // When control resumes before this line, a reply will have 
been received
-                // that matches the criteria defined in the blocking listener
+            // When control resumes before this line, a reply will have been 
received
+            // that matches the criteria defined in the blocking listener
         }
         catch (AMQException e)
         {
@@ -595,7 +596,7 @@
     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long 
timeout) throws AMQException, FailoverException
     {
         return writeCommandFrameAndWaitForReply(frame, new 
SpecificMethodFrameListener(frame.getChannel(), responseClass),
-                timeout);
+                                                timeout);
     }
 
     public void closeSession(AMQSession session) throws AMQException
@@ -621,12 +622,12 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         final AMQFrame frame =
-            ConnectionCloseBody.createAMQFrame(0, 
_protocolSession.getProtocolMajorVersion(),
-                _protocolSession.getProtocolMinorVersion(), // AMQP version 
(major, minor)
-                0, // classId
-                0, // methodId
-                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                new AMQShortString("JMS client is closing the connection.")); 
// replyText
+                ConnectionCloseBody.createAMQFrame(0, 
_protocolSession.getProtocolMajorVersion(),
+                                                   
_protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
+                                                   0, // classId
+                                                   0, // methodId
+                                                   
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                                                   new AMQShortString("JMS 
client is closing the connection.")); // replyText
 
         try
         {

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
 Tue Nov  6 03:12:37 2007
@@ -255,9 +255,9 @@
             if (_currentState != s)
             {
                 _logger.warn("State not achieved within permitted time.  
Current state " + _currentState
-                    + ", desired state: " + s);
+                             + ", desired state: " + s);
                 throw new AMQException("State not achieved within permitted 
time.  Current state " + _currentState
-                    + ", desired state: " + s);
+                                       + ", desired state: " + s);
             }
         }
 

Modified: 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
 Tue Nov  6 03:12:37 2007
@@ -22,23 +22,26 @@
 package org.apache.qpid.server.txn;
 
 import junit.framework.TestCase;
-import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSessionDirtyException;
 import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
-import javax.jms.Connection;
-import javax.jms.Message;
+import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.MessageListener;
-import javax.naming.spi.InitialContextFactory;
+import javax.jms.TransactionRolledBackException;
 import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
 import java.util.Hashtable;
 import java.util.concurrent.CountDownLatch;
 
@@ -49,7 +52,8 @@
     private static final Logger _logger = Logger.getLogger(TxnTest.class);
 
 
-    protected final String BROKER = "vm://:1";//"localhost";
+    //Set retries quite high to ensure that it continues to retry whilst the 
InVM broker is restarted.
+    protected final String BROKER = "vm://:1?retries='1000'";
     protected final String VHOST = "/test";
     protected final String QUEUE = "TxnTestQueue";
 
@@ -75,7 +79,11 @@
         Hashtable<String, String> env = new Hashtable<String, String>();
 
         env.put("connectionfactory.connection", "amqp://guest:[EMAIL 
PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'");
-        env.put("queue.queue", QUEUE);
+
+        // Ensure that the queue is unique for each test run.
+        // There appears to be other old sesssion/consumers when looping the 
tests this means that sometimes a message
+        // will disappear. When it has actually gone to the old client.
+        env.put("queue.queue", QUEUE + "-" + System.currentTimeMillis());
 
         _context = factory.getInitialContext(env);
 
@@ -109,7 +117,7 @@
         {
             _producerConnection.close();
         }
-        
+
         super.tearDown();
 
         if (BROKER.startsWith("vm://"))
@@ -124,10 +132,8 @@
         _consumer.setMessageListener(this);
         _clientConnection.start();
 
-        //Set TTL
         _producer.send(_producerSession.createTextMessage("TxtTestML"));
 
-
         try
         {
             //Wait for message to arrive
@@ -150,7 +156,6 @@
 
     public void onMessage(Message message)
     {
-
         try
         {
             assertEquals("Incorrect Message Received.", "TxtTestML", 
((TextMessage) message).getText());
@@ -170,19 +175,235 @@
     {
         _clientConnection.start();
 
-        //Set TTL
         _producer.send(_producerSession.createTextMessage("TxtTestReceive"));
 
         //Receive Message
         Message received = _consumer.receive(1000);
 
+        _clientSession.commit();
+
         assertEquals("Incorrect Message Received.", "TxtTestReceive", 
((TextMessage) received).getText());
-        //Receive Message
 
+        //Receive Message
         received = _consumer.receive(1000);
 
         assertNull("More messages received", received);
 
         _consumer.close();
     }
+
+    /**
+     * Test that after the connection has failed over that a sent message is 
still correctly receieved.
+     * Using Auto-Ack consumer.
+     *
+     * @throws JMSException
+     */
+    public void testReceiveAfterFailover() throws JMSException
+    {
+//        System.err.println("testReceiveAfterFailover");
+        _clientConnection.close();
+
+        MessageConsumer consumer = _producerSession.createConsumer(_queue);
+
+        failServer();
+
+//        System.err.println("Server restarted");
+
+        String MESSAGE_TXT = "TxtTestReceiveAfterFailoverTX";
+
+//        System.err.println("Prod Session:" + _producerSession + ":" + 
((AMQSession) _producerSession).isClosed());
+
+        Message sent = _producerSession.createTextMessage(MESSAGE_TXT);
+//        System.err.println("Created message");
+
+        _producer.send(sent);
+//        System.err.println("Sent message");
+
+        //Verify correct message received
+        Message received = consumer.receive(10000);
+//        System.err.println("Message Receieved:" + received);
+
+        assertNotNull("Message should be received.", received);
+        assertEquals("Incorrect Message Received.", MESSAGE_TXT, 
((TextMessage) received).getText());
+
+        //Check no more messages are received
+        received = consumer.receive(1000);
+        System.err.println("Second receive completed.");
+
+        assertNull("More messages received", received);
+
+        _producer.close();
+//        System.err.println("Close producer");
+
+        consumer.close();
+//        System.err.println("Close consumer");
+
+        _producerConnection.close();
+    }
+
+    /**
+     * Test that after the connection has failed over the dirty transaction is 
notified when calling commit
+     *
+     * @throws JMSException
+     */
+    public void testSendBeforeFailoverThenCommitTx() throws JMSException
+    {
+//        System.err.println("testSendBeforeFailoverThenCommitTx");
+        _clientConnection.start();
+
+        //Create a transacted producer.
+        MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+        String MESSAGE_TXT = "testSendBeforeFailoverThenCommitTx";
+
+        //Send the first message
+        txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+        failServer();
+
+        //Check that the message isn't received.
+        Message received = _consumer.receive(1000);
+        assertNull("Message received after failover to clean broker!", 
received);
+
+        //Attempt to commit session
+        try
+        {
+            _clientSession.commit();
+            fail("TransactionRolledBackException not thrown");
+        }
+        catch (JMSException jmse)
+        {
+            if (!(jmse instanceof TransactionRolledBackException))
+            {
+                fail(jmse.toString());
+            }
+        }
+
+        //Close consumer & producer
+        _consumer.close();
+        txProducer.close();
+    }
+
+    /**
+     * Test that after the connection has failed over the dirty transaction is 
fast failed by throwing an
+     * Exception on the next send.
+     *
+     * @throws JMSException
+     */
+    public void testSendBeforeFailoverThenSendTx() throws JMSException
+    {
+//        System.err.println("testSendBeforeFailoverThenSendTx");
+
+        _clientConnection.start();
+        MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+        String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+        //Send the first message
+        txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+        failServer();
+
+        //Check that the message isn't received.
+        Message received = _consumer.receive(1000);
+        assertNull("Message received after failover to clean broker!", 
received);
+
+        //Attempt to send another message on the session, here we should fast 
fail.
+        try
+        {
+            txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+            fail("JMSException not thrown");
+        }
+        catch (JMSException jmse)
+        {
+            if (!(jmse.getLinkedException() instanceof 
AMQSessionDirtyException))
+            {
+                fail(jmse.toString());
+            }
+        }
+
+
+        _consumer.close();
+    }
+
+    public void testSendBeforeFailoverThenSend2Tx() throws JMSException
+    {
+//        System.err.println("testSendBeforeFailoverThenSendTx");
+
+        _clientConnection.start();
+        MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+        String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+        //Send the first message
+        txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+        failServer();
+
+        //Check that the message isn't received.
+        Message received = _consumer.receive(1000);
+        assertNull("Message received after failover to clean broker!", 
received);
+
+        _clientSession.rollback();
+
+        //Attempt to send another message on the session, here we should fast 
fail.
+        try
+        {
+            txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+            txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+        }
+        catch (JMSException jmse)
+        {
+            if (jmse.getLinkedException() instanceof AMQSessionDirtyException)
+            {
+                fail(jmse.toString());
+            }
+        }
+
+
+        _consumer.close();
+    }
+
+
+    private void failServer()
+    {
+        if (BROKER.startsWith("vm://"))
+        {
+            //Work around for MessageStore not being initialise and the send 
not fully completing before the failover occurs.
+            try
+            {
+                Thread.sleep(5000);
+            }
+            catch (InterruptedException e)
+            {
+
+            }
+
+            TransportConnection.killAllVMBrokers();
+            ApplicationRegistry.remove(1);
+            try
+            {
+                TransportConnection.createVMBroker(1);
+            }
+            catch (AMQVMBrokerCreationException e)
+            {
+                _logger.error("Unable to restart broker due to :" + e);
+            }
+
+            //Work around for receive not being failover aware.. because it is 
the first receive it trys to
+            // unsuspend the channel but in this case the ChannelFlow command 
goes on the old session and the response on the
+            // new one ... though I thought the statemanager recorded the 
listeners so should be ok.???
+            try
+            {
+                Thread.sleep(5000);
+            }
+            catch (InterruptedException e)
+            {
+
+            }
+
+        }
+
+    }
+
 }


Reply via email to