Author: rajith
Date: Wed Apr 18 15:07:01 2007
New Revision: 530180

URL: http://svn.apache.org/viewvc?view=rev&rev=530180
Log:
added state support for distributed transactions

Modified:
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
 Wed Apr 18 15:07:01 2007
@@ -130,7 +130,7 @@
 
                        //_channelNotOpend.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
                        _channelNotOpend.await();
-                       checkIfConnectionClosed();
+                       checkIfChannelClosed();
                        AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, 
"The broker didn't send the ChannelOpenOkBody in time");
                        notifyState(AMQPState.CHANNEL_OPENED);
                        _currentState = AMQPState.CHANNEL_OPENED;
@@ -198,7 +198,7 @@
 
                        //_channelFlowNotResponded.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
                        _channelFlowNotResponded.await();
-                       checkIfConnectionClosed();
+                       checkIfChannelClosed();
                        AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, 
"The broker didn't send the ChannelFlowOkBody in time");
                        handleChannelFlowState(_channelFlowOkBody.active);
                        return _channelFlowOkBody;
@@ -228,7 +228,7 @@
 
                        //_channelNotResumed.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
                        _channelNotResumed.await();
-                       checkIfConnectionClosed();
+                       checkIfChannelClosed();
                        AMQPValidator.throwExceptionOnNull(_channelOkBody,
                                        "The broker didn't send the 
ChannelOkBody in response to the ChannelResumeBody in time");
                        notifyState(AMQPState.CHANNEL_OPENED);
@@ -330,7 +330,7 @@
                }
        }
 
-       private void checkIfConnectionClosed() throws AMQPException
+       private void checkIfChannelClosed() throws AMQPException
        {
                if (_channelCloseBody != null)
                {

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
 Wed Apr 18 15:07:01 2007
@@ -1,9 +1,11 @@
 package org.apache.qpid.nclient.amqp.qpid;
 
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.DtxDemarcationEndBody;
 import org.apache.qpid.framing.DtxDemarcationEndOkBody;
 import org.apache.qpid.framing.DtxDemarcationSelectBody;
@@ -11,16 +13,24 @@
 import org.apache.qpid.framing.DtxDemarcationStartBody;
 import org.apache.qpid.framing.DtxDemarcationStartOkBody;
 import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
 import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.util.AMQPValidator;
 
-public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation
+public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements 
AMQPMethodListener, AMQPDtxDemarcation
 {
        private static final Logger _logger = 
Logger.getLogger(QpidAMQPDtxDemarcation.class);
 
-       // the channelId assigned for this channel
+       // the channelId that will be used for transactions
        private int _channelId;
 
        private Phase _phase;
@@ -29,34 +39,88 @@
 
        private AMQPStateManager _stateManager;
 
-       private final AMQPState[] _validCloseStates = new AMQPState[]
-       { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+       private final AMQPState[] _validEndStates = new AMQPState[]
+       { AMQPState.DTX_STARTED };
 
-       private final AMQPState[] _validResumeStates = new AMQPState[]
-       { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+       private final AMQPState[] _validStartStates = new AMQPState[]
+       { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END };
 
        // The wait period until a server sends a respond
        private long _serverTimeOut = 1000;
 
        private final Lock _lock = new ReentrantLock();
        
+       private final Condition _dtxNotSelected = _lock.newCondition();
+
+       private final Condition _channelNotClosed = _lock.newCondition();
        
-       public DtxDemarcationEndOkBody end(DtxDemarcationEndBody 
dtxDemarcationEndBody) throws AMQPException
+       private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
+       
+       protected QpidAMQPDtxDemarcation(int channelId, Phase phase, 
AMQPStateManager stateManager)
        {
-               // TODO Auto-generated method stub
-               return null;
+               _channelId = channelId;
+               _phase = phase;
+               _stateManager = stateManager;
+               _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED;
+               _serverTimeOut = 
ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+       }       
+
+       /**
+        * ------------------------------------------- 
+        * API Methods
+        *  --------------------------------------------
+        */
+       public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody 
dtxDemarcationSelectBody) throws AMQPException
+       {
+               _lock.lock();
+               try
+               {
+                       _dtxDemarcationSelectOkBody = null;
+                       
checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, 
AMQPState.DTX_NOT_STARTED);
+                       AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, 
_dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+                       _phase.messageSent(msg);
+
+                       //_channelNotOpend.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
+                       _dtxNotSelected.await();
+                       
AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker 
didn't send the DtxDemarcationSelectOkBody in time");
+                       notifyState(AMQPState.CHANNEL_OPENED);
+                       _currentState = AMQPState.CHANNEL_OPENED;
+                       return _dtxDemarcationSelectOkBody;
+               }
+               catch (Exception e)
+               {
+                       throw new AMQPException("Error in dtx.select", e);
+               }
+               finally
+               {
+                       _lock.unlock();
+               }
        }
 
-       public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody 
dtxDemarcationSelectBody) throws AMQPException
+       public DtxDemarcationStartOkBody start(DtxDemarcationStartBody 
dtxDemarcationStartBody) throws AMQPException
        {
                // TODO Auto-generated method stub
                return null;
        }
-
-       public DtxDemarcationStartOkBody start(DtxDemarcationStartBody 
dtxDemarcationStartBody) throws AMQPException
+       
+       public DtxDemarcationEndOkBody end(DtxDemarcationEndBody 
dtxDemarcationEndBody) throws AMQPException
        {
                // TODO Auto-generated method stub
                return null;
        }
+       
+       /**
+        * ------------------------------------------- 
+        * AMQPMethodListener methods
+        * --------------------------------------------
+        */
+       public <B extends AMQMethodBody> boolean 
methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+       {
+               return true;
+       }
 
+       private void notifyState(AMQPState newState) throws AMQPException
+       {
+               _stateManager.notifyStateChanged(new 
AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+       }
 }

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
 Wed Apr 18 15:07:01 2007
@@ -57,4 +57,10 @@
     public static final AMQPState CHANNEL_OPENED = new AMQPState(11, 
"CHANNEL_OPENED");    
     public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, 
"CHANNEL_CLOSED");
     public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, 
"CHANNEL_SUSPEND");
+    
+    // Distributed Transaction state
+    public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, 
"DTX_CHANNEL_NOT_SELECTED");
+    public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, 
"DTX_NOT_STARTED");
+    public static final AMQPState DTX_STARTED = new AMQPState(10, 
"DTX_STARTED");
+    public static final AMQPState DTX_END = new AMQPState(10, "DTX_END");
 }


Reply via email to