Author: ritchiem
Date: Fri Oct 20 04:48:50 2006
New Revision: 466078
URL: http://svn.apache.org/viewvc?view=rev&rev=466078
Log:
Modified to maintain a reference to the lastWriteFuture. This is then used when
closing the ProtocolSession to join on so that we can be sure all data has been
written to the broker. A time out of 2 minutes ensures that the client doesn't
hang for ever if the broker fails.
Modified:
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=466078&r1=466077&r2=466078
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
Fri Oct 20 04:48:50 2006
@@ -47,6 +47,9 @@
*/
public class AMQProtocolSession implements ProtocolVersionList
{
+
+ private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+
private static final Logger _logger =
Logger.getLogger(AMQProtocolSession.class);
public static final String PROTOCOL_INITIATION_RECEIVED =
"ProtocolInitiatiionReceived";
@@ -59,6 +62,8 @@
private final IoSession _minaProtocolSession;
+ private WriteFuture _lastWriteFuture;
+
/**
* The handler from which this session was created and which is used to
handle protocol events.
* We send failover events to the handler.
@@ -255,7 +260,7 @@
*/
public void writeFrame(AMQDataBlock frame)
{
- _minaProtocolSession.write(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
@@ -265,6 +270,10 @@
{
f.join();
}
+ else
+ {
+ _lastWriteFuture = f;
+ }
}
public void addSessionByChannel(int channelId, AMQSession session)
@@ -342,6 +351,12 @@
public void closeProtocolSession()
{
+ _logger.debug("Waiting for last write to join.");
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ }
+
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
future.join();