Author: ritchiem
Date: Fri Oct 20 04:48:50 2006
New Revision: 466078

URL: http://svn.apache.org/viewvc?view=rev&rev=466078
Log:
Modified to maintain a reference to the lastWriteFuture. This is then used when 
closing the ProtocolSession to join on so that we can be sure all data has been 
written to the broker. A time out of 2 minutes ensures that the client doesn't 
hang for ever if the broker fails.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=466078&r1=466077&r2=466078
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
 Fri Oct 20 04:48:50 2006
@@ -47,6 +47,9 @@
  */
 public class AMQProtocolSession implements ProtocolVersionList
 {
+
+    private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+
     private static final Logger _logger = 
Logger.getLogger(AMQProtocolSession.class);
 
     public static final String PROTOCOL_INITIATION_RECEIVED = 
"ProtocolInitiatiionReceived";
@@ -59,6 +62,8 @@
 
     private final IoSession _minaProtocolSession;
 
+    private WriteFuture _lastWriteFuture;
+
     /**
      * The handler from which this session was created and which is used to 
handle protocol events.
      * We send failover events to the handler.
@@ -255,7 +260,7 @@
      */
     public void writeFrame(AMQDataBlock frame)
     {
-        _minaProtocolSession.write(frame);
+        writeFrame(frame, false);
     }
 
     public void writeFrame(AMQDataBlock frame, boolean wait)
@@ -265,6 +270,10 @@
         {
             f.join();
         }
+        else
+        {
+            _lastWriteFuture = f;
+        }
     }
 
     public void addSessionByChannel(int channelId, AMQSession session)
@@ -342,6 +351,12 @@
 
     public void closeProtocolSession()
     {
+        _logger.debug("Waiting for last write to join.");
+        if (_lastWriteFuture != null)
+        {
+            _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+        }
+
         _logger.debug("Closing protocol session");
         final CloseFuture future = _minaProtocolSession.close();
         future.join();


Reply via email to