Author: ritchiem
Date: Mon Jan 22 07:05:58 2007
New Revision: 498637

URL: http://svn.apache.org/viewvc?view=rev&rev=498637
Log:
QPID-310 Propagated JMS Exception to client.
QPID-308 Configurable timeout on blockForFrame.
 Timeouts added but need to be configurable.
QPID-311 Dispatcher Thread is not thread safe.
 Added the missing Synchronized code and renamed vars to make it more readable

Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=498637&r1=498636&r2=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Mon Jan 22 07:05:58 2007
@@ -142,12 +142,21 @@
      */
     private QpidConnectionMetaData _connectionMetaData;
 
+    /**
+     * @param broker      brokerdetails
+     * @param username    username
+     * @param password    password
+     * @param clientName  clientid
+     * @param virtualHost virtualhost
+     * @throws AMQException
+     * @throws URLSyntaxException
+     */
     public AMQConnection(String broker, String username, String password,
                          String clientName, String virtualHost) throws 
AMQException, URLSyntaxException
     {
         this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
                                   username + ":" + password + "@" +
-                                  (clientName==null?"":clientName) +
+                                  (clientName == null ? "" : clientName) +
                                   virtualHost + "?brokerlist='" + 
AMQBrokerDetails.checkTransport(broker) + "'"));
     }
 
@@ -163,12 +172,12 @@
         this(new AMQConnectionURL(useSSL ?
                                   ConnectionURL.AMQ_PROTOCOL + "://" +
                                   username + ":" + password + "@" +
-                                  (clientName==null?"":clientName) +
+                                  (clientName == null ? "" : clientName) +
                                   virtualHost + "?brokerlist='tcp://" + host + 
":" + port + "'"
                                   + "," + ConnectionURL.OPTIONS_SSL + 
"='true'" :
                                                                                
 ConnectionURL.AMQ_PROTOCOL + "://" +
                                                                                
 username + ":" + password + "@" +
-                                                                               
 (clientName==null?"":clientName) +
+                                                                               
 (clientName == null ? "" : clientName) +
                                                                                
 virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
                                                                                
 + "," + ConnectionURL.OPTIONS_SSL + "='false'"
         ));
@@ -466,22 +475,22 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-            ChannelOpenBody.createAMQFrame(channelId,
-                (byte)8, (byte)0,      // AMQP version (major, minor)
-                null), // outOfBand
-                ChannelOpenOkBody.class);
+                ChannelOpenBody.createAMQFrame(channelId,
+                                               (byte) 8, (byte) 0,    // AMQP 
version (major, minor)
+                                               null),    // outOfBand
+                                                         
ChannelOpenOkBody.class);
 
         //todo send low water mark when protocol allows.
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-            BasicQosBody.createAMQFrame(channelId,
-                (byte)8, (byte)0,      // AMQP version (major, minor)
-                false, // global
-                prefetchHigh,  // prefetchCount
-                0),    // prefetchSize
-                BasicQosOkBody.class);
+                BasicQosBody.createAMQFrame(channelId,
+                                            (byte) 8, (byte) 0,    // AMQP 
version (major, minor)
+                                            false,    // global
+                                            prefetchHigh,    // prefetchCount
+                                            0),    // prefetchSize
+                                                   BasicQosOkBody.class);
 
         if (transacted)
         {
@@ -492,7 +501,7 @@
             // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions 
change.
-            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, 
(byte)8, (byte)0), TxSelectOkBody.class);
+            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, 
(byte) 8, (byte) 0), TxSelectOkBody.class);
         }
     }
 
@@ -524,6 +533,7 @@
     /**
      * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws 
IllegalStateExceptions
      * where specified in the JMS spec
+     *
      * @param transacted
      * @param acknowledgeMode
      * @return QueueSession
@@ -537,6 +547,7 @@
     /**
      * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws 
IllegalStateExceptions
      * where specified in the JMS spec
+     *
      * @param transacted
      * @param acknowledgeMode
      * @return TopicSession
@@ -571,7 +582,7 @@
     {
         checkNotClosed();
         return _connectionMetaData;
-        
+
     }
 
     public ExceptionListener getExceptionListener() throws JMSException
@@ -622,14 +633,19 @@
 
     public void close() throws JMSException
     {
-        synchronized(getFailoverMutex())
+        close(-1);
+    }
+
+    public void close(long timeout) throws JMSException
+    {
+        synchronized (getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
             {
                 try
                 {
-                    closeAllSessions(null);
-                    _protocolHandler.closeConnection();
+                    closeAllSessions(null, timeout);
+                    _protocolHandler.closeConnection(timeout);
                 }
                 catch (AMQException e)
                 {
@@ -666,7 +682,7 @@
      *              <p/>
      *              The caller must hold the failover mutex before calling 
this method.
      */
-    private void closeAllSessions(Throwable cause) throws JMSException
+    private void closeAllSessions(Throwable cause, long timeout) throws 
JMSException
     {
         final LinkedList sessionCopy = new LinkedList(_sessions.values());
         final Iterator it = sessionCopy.iterator();
@@ -682,7 +698,7 @@
             {
                 try
                 {
-                    session.close();
+                    session.close(timeout);
                 }
                 catch (JMSException e)
                 {
@@ -900,7 +916,7 @@
         {
             if (cause instanceof AMQException)
             {
-                je = new 
JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception 
thrown against " + toString() + ": " + cause);
+                je = new JMSException(Integer.toString(((AMQException) 
cause).getErrorCode()), "Exception thrown against " + toString() + ": " + 
cause);
             }
             else
             {
@@ -931,7 +947,7 @@
             {
                 _logger.info("Closing AMQConnection due to :" + 
cause.getMessage());
                 _closed.set(true);
-                closeAllSessions(cause); // FIXME: when doing this end up with 
RejectedExecutionException from executor.
+                closeAllSessions(cause, -1); // FIXME: when doing this end up 
with RejectedExecutionException from executor.
             }
             catch (JMSException e)
             {
@@ -953,8 +969,8 @@
     void deregisterSession(int channelId)
     {
         _sessions.remove(channelId);
-    }    
-    
+    }
+
     /**
      * For all sessions, and for all consumers in those sessions, resubscribe. 
This is called during failover handling.
      * The caller must hold the failover mutex before calling this method.

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=498637&r1=498636&r2=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Jan 22 07:05:58 2007
@@ -140,12 +140,12 @@
     /**
      * Used to signal 'pausing' the dispatcher when setting a message listener 
on a consumer
      */
-    private final AtomicBoolean _pausing = new AtomicBoolean(false);
+    private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
 
     /**
      * Used to signal 'pausing' the dispatcher when setting a message listener 
on a consumer
      */
-    private final AtomicBoolean _paused = new AtomicBoolean(false);
+    private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
 
     /**
      * Set when recover is called. This is to handle the case where recover() 
is called by application code
@@ -171,7 +171,8 @@
      */
     private class Dispatcher extends Thread
     {
-        private final Logger _logger = Logger.getLogger(Dispatcher.class);     
   
+        private final Logger _logger = Logger.getLogger(Dispatcher.class);
+        private boolean _reDispatching = true;
 
         public Dispatcher()
         {
@@ -184,41 +185,47 @@
 
             while (!_stopped.get())
             {
-                if (_pausing.get())
+                synchronized (_pausingDispatcher)
                 {
-                    try
+                    if (_pausingDispatcher.get())
                     {
-                        //Wait for unpausing
-                        synchronized (_pausing)
+                        try
                         {
-                            synchronized (_paused)
+
+                            _pausingDispatcher.set(false);
+
+                            //Wait to continue with pause code.
+                            synchronized (_pausedDispatcher)
                             {
-                                _paused.notify();
+                                _pausedDispatcher.notify();
                             }
 
-                            _logger.info("dispatcher paused");
-                            
-                            _pausing.wait();
-                            _logger.info("dispatcher notified");
-                        }
+                            _reDispatching = true;
 
+                            _logger.info("Dispatcher paused");
+                            _pausingDispatcher.wait();
+                            _logger.info("Dispatcher notified");
+
+                        }
+                        catch (InterruptedException e)
+                        {
+                            _logger.info("dispacher interrupted");
+                        }
                     }
-                    catch (InterruptedException e)
-                    {
-                        //do nothing... occurs when a pause request occurs 
will already
-                        // be here if another pause event is pending
-                        _logger.info("dispacher interrupted");
-                    }
+                }
 
+                if (_reDispatching)
+                {
                     doReDispatch();
-
                 }
                 else
                 {
                     doNormalDispatch();
                 }
+
             }
 
+
             _logger.info("Dispatcher thread terminating for channel " + 
_channelId);
         }
 
@@ -227,7 +234,7 @@
             UnprocessedMessage message;
             try
             {
-                while (!_stopped.get() && !_pausing.get() && (message = 
(UnprocessedMessage) _queue.take()) != null)
+                while (!_stopped.get() && !_pausingDispatcher.get() && 
(message = (UnprocessedMessage) _queue.take()) != null)
                 {
                     dispatchMessage(message);
                 }
@@ -257,7 +264,8 @@
             if (_reprocessQueue == null || _reprocessQueue.isEmpty())
             {
                 _logger.info("Reprocess Queue emptied");
-                _pausing.set(false);
+
+                _reDispatching = false;
             }
             else
             {
@@ -343,30 +351,30 @@
         public void pause()
         {
             _logger.info("pausing");
-            _pausing.set(true);
 
+            synchronized (_pausedDispatcher)
+            {
+                _pausingDispatcher.set(true);
 
-            interrupt();
+                interrupt();
 
-            synchronized (_paused)
-            {
                 try
                 {
-                    _paused.wait();
+                    _pausedDispatcher.wait();
                 }
                 catch (InterruptedException e)
                 {
-                  //do nothing
+                    //do nothing
                 }
             }
         }
 
         public void reprocess()
         {
-            synchronized (_pausing)
+            synchronized (_pausingDispatcher)
             {
                 _logger.info("reprocessing");
-                _pausing.notify();
+                _pausingDispatcher.notify();
             }
         }
     }
@@ -578,6 +586,11 @@
 
     public void close() throws JMSException
     {
+        close(-1);
+    }
+
+    public void close(long timeout) throws JMSException
+    {
         // We must close down all producers and consumers in an orderly 
fashion. This is the only method
         // that can be called from a different thread of control from the one 
controlling the session
         synchronized (_connection.getFailoverMutex())
@@ -624,8 +637,9 @@
      *
      * @param amqe the exception, may be null to indicate no error has occurred
      */
-    private void closeProducersAndConsumers(AMQException amqe)
+    private void closeProducersAndConsumers(AMQException amqe) throws 
JMSException
     {
+        JMSException jmse = null;
         try
         {
             closeProducers();
@@ -633,6 +647,7 @@
         catch (JMSException e)
         {
             _logger.error("Error closing session: " + e, e);
+            jmse = e;
         }
         try
         {
@@ -641,7 +656,19 @@
         catch (JMSException e)
         {
             _logger.error("Error closing session: " + e, e);
+            if (jmse == null)
+            {
+                jmse = e;
+            }
         }
+        finally
+        {
+            if (jmse != null)
+            {
+                throw jmse;
+            }
+        }
+
     }
 
     /**
@@ -650,7 +677,7 @@
      *
      * @param e the exception that caused this session to be closed. Null 
causes the
      */
-    public void closed(Throwable e)
+    public void closed(Throwable e) throws JMSException
     {
         synchronized (_connection.getFailoverMutex())
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=498637&r1=498636&r2=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
 Mon Jan 22 07:05:58 2007
@@ -82,8 +82,6 @@
         // client code which runs in a separate thread.
         synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
         {
-            _logger.info("Starting failover process");
-
             // We switch in a new state manager temporarily so that the 
interaction to get to the "connection open"
             // state works, without us having to terminate any existing "state 
waiters". We could theoretically
             // have a state waiter waiting until the connection is closed for 
some reason. Or in future we may have
@@ -92,6 +90,8 @@
             _amqProtocolHandler.setStateManager(new 
AMQStateManager(_amqProtocolHandler.getProtocolSession()));
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != 
null))
             {
+                _logger.info("Failover process veto-ed by client");
+
                 _amqProtocolHandler.setStateManager(existingStateManager);
                 if (_host != null)
                 {
@@ -105,6 +105,9 @@
                 _amqProtocolHandler.setFailoverLatch(null);
                 return;
             }
+
+            _logger.info("Starting failover process");
+
             boolean failoverSucceeded;
             // when host is non null we have a specified failover host 
otherwise we all the client to cycle through
             // all specified hosts

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=498637&r1=498636&r2=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Mon Jan 22 07:05:58 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
@@ -89,6 +90,8 @@
 
     private CountDownLatch _failoverLatch;
 
+    private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+
     public AMQProtocolHandler(AMQConnection con)
     {
         _connection = con;
@@ -280,7 +283,7 @@
     public void propagateExceptionToWaiters(Exception e)
     {
         getStateManager().error(e);
-        if(!_frameListeners.isEmpty())
+        if (!_frameListeners.isEmpty())
         {
             final Iterator it = _frameListeners.iterator();
             while (it.hasNext())
@@ -319,7 +322,7 @@
             {
 
                 boolean wasAnyoneInterested = 
getStateManager().methodReceived(evt);
-                if(!_frameListeners.isEmpty())
+                if (!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
                     while (it.hasNext())
@@ -330,13 +333,13 @@
                 }
                 if (!wasAnyoneInterested)
                 {
-                    throw new AMQException("AMQMethodEvent " + evt + " was not 
processed by any listener.  Listeners:"  + _frameListeners);
+                    throw new AMQException("AMQMethodEvent " + evt + " was not 
processed by any listener.  Listeners:" + _frameListeners);
                 }
             }
             catch (AMQException e)
             {
                 getStateManager().error(e);
-                if(!_frameListeners.isEmpty())
+                if (!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
                     while (it.hasNext())
@@ -383,17 +386,18 @@
             _logger.debug("Sent frame " + message);
         }
     }
-/*
-    public void addFrameListener(AMQMethodListener listener)
-    {
-        _frameListeners.add(listener);
-    }
 
-    public void removeFrameListener(AMQMethodListener listener)
-    {
-        _frameListeners.remove(listener);
-    }
-  */
+    /*
+      public void addFrameListener(AMQMethodListener listener)
+      {
+          _frameListeners.add(listener);
+      }
+
+      public void removeFrameListener(AMQMethodListener listener)
+      {
+          _frameListeners.remove(listener);
+      }
+    */
     public void attainState(AMQState s) throws AMQException
     {
         getStateManager().attainState(s);
@@ -427,12 +431,27 @@
                                                             
BlockingMethodFrameListener listener)
             throws AMQException
     {
+        return writeCommandFrameAndWaitForReply(frame, listener, 
DEFAULT_SYNC_TIMEOUT);
+    }
+
+    /**
+     * Convenience method that writes a frame to the protocol session and 
waits for
+     * a particular response. Equivalent to calling 
getProtocolSession().write() then
+     * waiting for the response.
+     *
+     * @param frame
+     * @param listener the blocking listener. Note the calling thread will 
block.
+     */
+    private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+                                                            
BlockingMethodFrameListener listener, long timeout)
+            throws AMQException
+    {
         try
         {
             _frameListeners.add(listener);
             _protocolSession.writeFrame(frame);
 
-            AMQMethodEvent e = listener.blockForFrame();
+            AMQMethodEvent e = listener.blockForFrame(timeout);
             return e;
             // When control resumes before this line, a reply will have been 
received
             // that matches the criteria defined in the blocking listener
@@ -454,8 +473,16 @@
      */
     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) 
throws AMQException
     {
+        return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
+    }
+
+    /**
+     * More convenient method to write a frame and wait for it's response.
+     */
+    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long 
timeout) throws AMQException
+    {
         return writeCommandFrameAndWaitForReply(frame,
-                                                new 
SpecificMethodFrameListener(frame.channel, responseClass));
+                                                new 
SpecificMethodFrameListener(frame.channel, responseClass), timeout);
     }
 
     /**
@@ -488,20 +515,34 @@
 
     public void closeConnection() throws AMQException
     {
+        closeConnection(-1);
+    }
+
+    public void closeConnection(long timeout) throws AMQException
+    {
         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
-            (byte)8, (byte)0,  // AMQP version (major, minor)
-            0, // classId
-            0, // methodId
-            AMQConstant.REPLY_SUCCESS.getCode(),       // replyCode
-            new AMQShortString("JMS client is closing the connection."));      
// replyText
-        syncWrite(frame, ConnectionCloseOkBody.class);
+                                                                  (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
+                                                                  0,    // 
classId
+                                                                  0,    // 
methodId
+                                                                  
AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
+                                                                  new 
AMQShortString("JMS client is closing the connection."));    // replyText
+
+        try
+        {
+            syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+            _protocolSession.closeProtocolSession();
+        }
+        catch (AMQTimeoutException e)
+        {
+            _protocolSession.closeProtocolSession(false);
+        }
+
 
-        _protocolSession.closeProtocolSession();
     }
 
     /**
@@ -566,7 +607,7 @@
         _stateManager = stateManager;
         _protocolSession.setStateManager(stateManager);
     }
-    
+
     public AMQProtocolSession getProtocolSession()
     {
         return _protocolSession;

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=498637&r1=498636&r2=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 Mon Jan 22 07:05:58 2007
@@ -43,7 +43,7 @@
 
 /**
  * Wrapper for protocol session that provides type-safe access to session 
attributes.
- *
+ * <p/>
  * The underlying protocol session is still available but clients should not
  * use it to obtain session attributes.
  */
@@ -110,6 +110,8 @@
         _minaProtocolSession = protocolSession;
         // properties of the connection are made available to the event 
handlers
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+        //fixme - real value needed
+        _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         _stateManager = new AMQStateManager(this);
     }
 
@@ -119,10 +121,11 @@
         _minaProtocolSession = protocolSession;
         // properties of the connection are made available to the event 
handlers
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
-
+        //fixme - real value needed
+        _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         _stateManager = stateManager;
         _stateManager.setProtocolSession(this);
-                
+
     }
 
     public void init()
@@ -153,12 +156,12 @@
     {
         getAMQConnection().setClientID(clientID);
     }
-    
+
     public AMQStateManager getStateManager()
     {
         return _stateManager;
     }
-    
+
     public void setStateManager(AMQStateManager stateManager)
     {
         _stateManager = stateManager;
@@ -191,8 +194,9 @@
 
     /**
      * Store the SASL client currently being used for the authentication 
handshake
+     *
      * @param client if non-null, stores this in the session. if null clears 
any existing client
-     * being stored
+     *               being stored
      */
     public void setSaslClient(SaslClient client)
     {
@@ -223,6 +227,7 @@
     /**
      * Callback invoked from the BasicDeliverMethodHandler when a message has 
been received.
      * This is invoked on the MINA dispatcher thread.
+     *
      * @param message
      * @throws AMQException if this was not expected
      */
@@ -280,8 +285,9 @@
     /**
      * Deliver a message to the appropriate session, removing the unprocessed 
message
      * from our map
+     *
      * @param channelId the channel id the message should be delivered to
-     * @param msg the message
+     * @param msg       the message
      */
     private void deliverMessageToAMQSession(int channelId, UnprocessedMessage 
msg)
     {
@@ -306,6 +312,7 @@
         WriteFuture f = _minaProtocolSession.write(frame);
         if (wait)
         {
+            //fixme -- time out?
             f.join();
         }
         else
@@ -340,6 +347,7 @@
 
     /**
      * Starts the process of closing a session
+     *
      * @param session the AMQSession being closed
      */
     public void closeSession(AMQSession session)
@@ -361,19 +369,27 @@
      * This method decides whether this is a response or an initiation. The 
latter
      * case causes the AMQSession to be closed and an exception to be thrown if
      * appropriate.
+     *
      * @param channelId the id of the channel (session)
      * @return true if the client must respond to the server, i.e. if the 
server
-     * initiated the channel close, false if the channel close is just the 
server
-     * responding to the client's earlier request to close the channel.
+     *         initiated the channel close, false if the channel close is just 
the server
+     *         responding to the client's earlier request to close the channel.
      */
-    public boolean channelClosed(int channelId, int code, String text)
+    public boolean channelClosed(int channelId, int code, String text) throws 
AMQException
     {
         final Integer chId = channelId;
         // if this is not a response to an earlier request to close the channel
         if (_closingChannels.remove(chId) == null)
         {
             final AMQSession session = (AMQSession) 
_channelId2SessionMap.get(chId);
-            session.closed(new AMQException(_logger, code, text));
+            try
+            {
+                session.closed(new AMQException(_logger, code, text));
+            }
+            catch (JMSException e)
+            {
+                throw new AMQException("JMSException received while closing 
session", e);
+            }
             return true;
         }
         else
@@ -389,15 +405,20 @@
 
     public void closeProtocolSession()
     {
+        closeProtocolSession(true);
+    }
+
+    public void closeProtocolSession(boolean waitLast)
+    {
         _logger.debug("Waiting for last write to join.");
-        if (_lastWriteFuture != null)
+        if (waitLast && _lastWriteFuture != null)
         {
             _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         }
 
         _logger.debug("Closing protocol session");
         final CloseFuture future = _minaProtocolSession.close();
-        future.join();
+        future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
     }
 
     public void failover(String host, int port)
@@ -408,17 +429,16 @@
     protected AMQShortString generateQueueName()
     {
         int id;
-        synchronized(_queueIdLock)
+        synchronized (_queueIdLock)
         {
             id = _queueId++;
         }
         //get rid of / and : and ; from address for spec conformance
-        String localAddress = 
StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:","");
+        String localAddress = 
StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), 
"/;:", "");
         return new AMQShortString("tmp_" + localAddress + "_" + id);
     }
 
     /**
-     *
      * @param delay delay in seconds (not ms)
      */
     void initHeartbeats(int delay)

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=498637&r1=498636&r2=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 Mon Jan 22 07:05:58 2007
@@ -21,6 +21,8 @@
 package org.apache.qpid.client.protocol;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -91,7 +93,7 @@
     /**
      * This method is called by the thread that wants to wait for a frame.
      */
-    public AMQMethodEvent blockForFrame() throws AMQException
+    public AMQMethodEvent blockForFrame(long timeout) throws AMQException
     {
         synchronized (_lock)
         {
@@ -99,11 +101,29 @@
             {
                 try
                 {
-                    _lock.wait();
+                    if (timeout == -1)
+                    {
+                        _lock.wait();
+                    }
+                    else
+                    {
+
+                        _lock.wait(timeout);
+                        if (!_ready)
+                        {
+                            _error = new AMQTimeoutException("Server did not 
respond in a timely fashion");
+                            _ready = true;
+                        }
+                    }
                 }
                 catch (InterruptedException e)
                 {
-                    // IGNORE
+                    // IGNORE    -- //fixme this isn't ideal as being 
interrupted isn't equivellant to sucess
+                    if (!_ready && timeout != -1)
+                    {
+                        _error = new AMQException("Server did not respond 
timely");
+                        _ready = true;
+                    }
                 }
             }
         }
@@ -115,7 +135,8 @@
             }
             else if (_error instanceof FailoverException)
             {
-                throw (FailoverException)_error;  // needed to expose 
FailoverException.
+                // This should ensure that FailoverException is not wrapped 
and can be caught.                
+                throw(FailoverException) _error;  // needed to expose 
FailoverException.
             }
             else
             {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?view=diff&rev=498637&r1=498636&r2=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
 Mon Jan 22 07:05:58 2007
@@ -25,7 +25,6 @@
 
 /**
  * Waits for a particular state to be reached.
- *
  */
 public class StateWaiter implements StateListener
 {
@@ -38,6 +37,7 @@
     private volatile Throwable _throwable;
 
     private final Object _monitor = new Object();
+    private static final long TIME_OUT = 1000 * 60 * 2;
 
     public StateWaiter(AMQState state)
     {
@@ -46,7 +46,7 @@
 
     public void waituntilStateHasChanged() throws AMQException
     {
-        synchronized(_monitor)
+        synchronized (_monitor)
         {
             //
             // The guard is required in case we are woken up by a spurious
@@ -57,7 +57,7 @@
                 try
                 {
                     _logger.debug("State " + _state + " not achieved so 
waiting...");
-                    _monitor.wait();
+                    _monitor.wait(TIME_OUT);
                 }
                 catch (InterruptedException e)
                 {
@@ -82,7 +82,7 @@
 
     public void stateChanged(AMQState oldState, AMQState newState)
     {
-        synchronized(_monitor)
+        synchronized (_monitor)
         {
             if (_logger.isDebugEnabled())
             {
@@ -103,7 +103,7 @@
 
     public void error(Throwable t)
     {
-        synchronized(_monitor)
+        synchronized (_monitor)
         {
             if (_logger.isDebugEnabled())
             {

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java?view=auto&rev=498637
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
 Mon Jan 22 07:05:58 2007
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid;
+
+public class AMQTimeoutException extends AMQException
+{
+    public AMQTimeoutException(String message)
+    {
+        super(message);
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to