Author: rajith
Date: Thu Apr 19 16:08:19 2007
New Revision: 530586

URL: http://svn.apache.org/viewvc?view=rev&rev=530586
Log:
added cordination support

Added:
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
Modified:
    incubator/qpid/branches/client_restructure/java/client/pom.xml
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java

Modified: incubator/qpid/branches/client_restructure/java/client/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/client/pom.xml?view=diff&rev=530586&r1=530585&r2=530586
==============================================================================
--- incubator/qpid/branches/client_restructure/java/client/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/client/pom.xml Thu Apr 19 
16:08:19 2007
@@ -53,6 +53,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jta_1.1_spec</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
         </dependency>

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java?view=diff&rev=530586&r1=530585&r2=530586
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
 Thu Apr 19 16:08:19 2007
@@ -21,32 +21,27 @@
 package org.apache.qpid.nclient.amqp;
 
 import org.apache.qpid.framing.DtxCoordinationCommitBody;
-import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
 import org.apache.qpid.framing.DtxCoordinationForgetBody;
-import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
 import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
-import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
 import org.apache.qpid.framing.DtxCoordinationPrepareBody;
-import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
 import org.apache.qpid.framing.DtxCoordinationRecoverBody;
-import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
 import org.apache.qpid.framing.DtxCoordinationRollbackBody;
-import org.apache.qpid.framing.DtxCoordinationRollbackOkBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
 import org.apache.qpid.nclient.core.AMQPException;
 
 public interface AMQPDtxCoordination
 {
-       public DtxCoordinationCommitOkBody commit(DtxCoordinationCommitBody 
dtxCoordinationCommitBody) throws AMQPException;
+       public void commit(DtxCoordinationCommitBody 
dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException;
        
-       public DtxCoordinationForgetOkBody forget(DtxCoordinationForgetBody 
dtxCoordinationForgetBody) throws AMQPException;
+       public void forget(DtxCoordinationForgetBody 
dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException;
        
-       public DtxCoordinationGetTimeoutOkBody 
getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody) throws 
AMQPException;
+       public void getTimeOut(DtxCoordinationGetTimeoutBody 
dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException;
        
-       public DtxCoordinationPrepareOkBody prepare(DtxCoordinationPrepareBody 
dtxCoordinationPrepareBody) throws AMQPException;
+       public void prepare(DtxCoordinationPrepareBody 
dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException;
        
-       public DtxCoordinationRecoverOkBody recover(DtxCoordinationRecoverBody 
dtxCoordinationRecoverBody) throws AMQPException;
+       public void recover(DtxCoordinationRecoverBody 
dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException;
        
-       public DtxCoordinationRollbackOkBody 
getTimeOut(DtxCoordinationRollbackBody dtxCoordinationRollbackBody) throws 
AMQPException;
+       public void rollback(DtxCoordinationRollbackBody 
dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException;
        
-       //public DtxCoordinationSetTimeoutOkBody 
getTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody) throws 
AMQPException;
+       public void setTimeOut(DtxCoordinationSetTimeoutBody 
dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException;
 }

Added: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java?view=auto&rev=530586
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
 (added)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
 Thu Apr 19 16:08:19 2007
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.DtxCoordinationCommitBody;
+import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
+import org.apache.qpid.framing.DtxCoordinationForgetBody;
+import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
+import org.apache.qpid.framing.DtxCoordinationRollbackBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPDtxCoordination;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements 
AMQPMethodListener, AMQPDtxCoordination
+{
+       private Phase _phase;
+
+       protected QpidAMQPDtxCoordination(int channelId,Phase phase)
+       {
+               super(channelId);
+               _phase = phase;
+       }
+
+       public void commit(DtxCoordinationCommitBody 
dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException
+       {
+               AMQPMethodEvent msg = 
handleAsynchronousCall(dtxCoordinationCommitBody,cb);
+               _phase.messageSent(msg);
+       }
+
+       public void forget(DtxCoordinationForgetBody 
dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException
+       {
+               AMQPMethodEvent msg = 
handleAsynchronousCall(dtxCoordinationForgetBody,cb);
+               _phase.messageSent(msg);
+       }
+
+       public void getTimeOut(DtxCoordinationGetTimeoutBody 
dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException
+       {
+               AMQPMethodEvent msg = 
handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb);
+               _phase.messageSent(msg);
+       }
+
+       public void rollback(DtxCoordinationRollbackBody 
dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException
+       {
+               AMQPMethodEvent msg = 
handleAsynchronousCall(dtxCoordinationRollbackBody,cb);
+               _phase.messageSent(msg);
+       }
+
+       public void prepare(DtxCoordinationPrepareBody 
dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException
+       {
+               AMQPMethodEvent msg = 
handleAsynchronousCall(dtxCoordinationPrepareBody,cb);
+               _phase.messageSent(msg);
+       }
+
+       public void recover(DtxCoordinationRecoverBody 
dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException
+       {
+               AMQPMethodEvent msg = 
handleAsynchronousCall(dtxCoordinationRecoverBody,cb);
+               _phase.messageSent(msg);
+       }
+       
+       public void setTimeOut(DtxCoordinationSetTimeoutBody 
dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException
+       {
+               AMQPMethodEvent msg = 
handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb);
+               _phase.messageSent(msg);
+       }
+       
+       /**-------------------------------------------
+     * AMQPMethodListener methods
+     *--------------------------------------------
+     */
+       public <B extends AMQMethodBody> boolean 
methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+       long localCorrelationId = evt.getLocalCorrelationId();
+       AMQMethodBody methodBody = evt.getMethod(); 
+       if ( methodBody instanceof DtxCoordinationCommitOkBody ||
+                methodBody instanceof DtxCoordinationForgetOkBody        ||
+                methodBody instanceof DtxCoordinationGetTimeoutOkBody  ||
+                methodBody instanceof DtxCoordinationPrepareOkBody       ||
+                methodBody instanceof DtxCoordinationRecoverOkBody       
+           )
+       {
+               invokeCallBack(localCorrelationId,methodBody);
+               return true;
+       }       
+       else
+       {
+               return false;
+       }
+    }  
+
+}

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java?view=diff&rev=530586&r1=530585&r2=530586
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
 Thu Apr 19 16:08:19 2007
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.nclient.amqp.qpid;
 
 import java.util.concurrent.locks.Condition;
@@ -52,10 +72,17 @@
        
        private final Condition _dtxNotSelected = _lock.newCondition();
 
-       private final Condition _channelNotClosed = _lock.newCondition();
+       private final Condition _dtxNotStarted = _lock.newCondition();
+       
+       // maybe it needs a better name
+       private final Condition _dtxNotEnd = _lock.newCondition();
        
        private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
        
+       private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody;
+       
+       private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody;
+       
        protected QpidAMQPDtxDemarcation(int channelId, Phase phase, 
AMQPStateManager stateManager)
        {
                _channelId = channelId;
@@ -77,14 +104,14 @@
                {
                        _dtxDemarcationSelectOkBody = null;
                        
checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, 
AMQPState.DTX_NOT_STARTED);
-                       AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, 
_dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+                       AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, 
dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID);
                        _phase.messageSent(msg);
 
-                       //_channelNotOpend.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
+                       //_dtxNotSelected.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
                        _dtxNotSelected.await();
                        
AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker 
didn't send the DtxDemarcationSelectOkBody in time");
-                       notifyState(AMQPState.CHANNEL_OPENED);
-                       _currentState = AMQPState.CHANNEL_OPENED;
+                       notifyState(AMQPState.DTX_NOT_STARTED);
+                       _currentState = AMQPState.DTX_NOT_STARTED;
                        return _dtxDemarcationSelectOkBody;
                }
                catch (Exception e)
@@ -99,14 +126,56 @@
 
        public DtxDemarcationStartOkBody start(DtxDemarcationStartBody 
dtxDemarcationStartBody) throws AMQPException
        {
-               // TODO Auto-generated method stub
-               return null;
+               _lock.lock();
+               try
+               {
+                       _dtxDemarcationStartOkBody = null;
+                       checkIfValidStateTransition(_validStartStates, 
_currentState, AMQPState.DTX_STARTED);
+                       AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, 
_dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+                       _phase.messageSent(msg);
+
+                       //_dtxNotStarted.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
+                       _dtxNotStarted.await();
+                       
AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker 
didn't send the DtxDemarcationStartOkBody in time");
+                       notifyState(AMQPState.DTX_STARTED);
+                       _currentState = AMQPState.DTX_STARTED;
+                       return _dtxDemarcationStartOkBody;
+               }
+               catch (Exception e)
+               {
+                       throw new AMQPException("Error in dtx.start", e);
+               }
+               finally
+               {
+                       _lock.unlock();
+               }
        }
        
        public DtxDemarcationEndOkBody end(DtxDemarcationEndBody 
dtxDemarcationEndBody) throws AMQPException
        {
-               // TODO Auto-generated method stub
-               return null;
+               _lock.lock();
+               try
+               {
+                       _dtxDemarcationEndOkBody = null;
+                       checkIfValidStateTransition(_validEndStates, 
_currentState, AMQPState.DTX_END);
+                       AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, 
_dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+                       _phase.messageSent(msg);
+
+                       //_dtxNotEnd.await(_serverTimeOut, 
TimeUnit.MILLISECONDS);
+                       _dtxNotEnd.await();
+                       
AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't 
send the DtxDemarcationEndOkBody in time");
+                       notifyState(AMQPState.DTX_END);
+                       _currentState = AMQPState.DTX_END;
+                       return _dtxDemarcationEndOkBody;
+               }
+               catch (Exception e)
+               {
+                       throw new AMQPException("Error in dtx.start", e);
+               }
+               finally
+               {
+                       _lock.unlock();
+               }
        }
        
        /**
@@ -116,7 +185,36 @@
         */
        public <B extends AMQMethodBody> boolean 
methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
        {
-               return true;
+               _lock.lock();
+               try
+               {
+                       if (evt.getMethod() instanceof 
DtxDemarcationSelectOkBody)
+                       {
+                               _dtxDemarcationEndOkBody = 
(DtxDemarcationEndOkBody) evt.getMethod();
+                               _dtxNotSelected.signal();
+                               return true;
+                       }
+                       else if (evt.getMethod() instanceof 
DtxDemarcationStartOkBody)
+                       {
+                               _dtxDemarcationStartOkBody = 
(DtxDemarcationStartOkBody) evt.getMethod();
+                               _dtxNotStarted.signal();
+                               return true;
+                       }
+                       else if (evt.getMethod() instanceof 
DtxDemarcationEndOkBody)
+                       {
+                               _dtxDemarcationEndOkBody = 
(DtxDemarcationEndOkBody) evt.getMethod();
+                               _dtxNotEnd.signal();
+                               return true;
+                       }
+                       else
+                       {
+                               return false;
+                       }
+               }
+               finally
+               {
+                       _lock.unlock();
+               }
        }
 
        private void notifyState(AMQPState newState) throws AMQPException


Reply via email to