Author: dasarath
Date: Mon Jan  2 11:07:59 2006
New Revision: 365395

URL: http://svn.apache.org/viewcvs?rev=365395&view=rev
Log: (empty)

Modified:
    
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java

Modified: 
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
URL: 
http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java?rev=365395&r1=365394&r2=365395&view=diff
==============================================================================
--- 
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
 (original)
+++ 
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
 Mon Jan  2 11:07:59 2006
@@ -6,7 +6,6 @@
 
 import java.rmi.RemoteException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -14,8 +13,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import javax.xml.namespace.QName;
-
 import org.apache.axis.AxisFault;
 import org.apache.axis.MessageContext;
 import org.apache.axis.components.uuid.UUIDGen;
@@ -27,7 +24,6 @@
 import org.apache.axis.types.URI.MalformedURIException;
 import org.apache.ws.transaction.coordinator.CoordinationService;
 import org.apache.ws.transaction.coordinator.CoordinatorImpl;
-import 
org.apache.ws.transaction.coordinator.InvalidCoordinationProtocolException;
 import org.apache.ws.transaction.coordinator.TimedOutException;
 import org.apache.ws.transaction.wsat.Notification;
 
@@ -37,19 +33,17 @@
  */
 public class ATCoordinatorImpl extends CoordinatorImpl implements 
ATCoordinator {
 
-       int status = AT2PCStatus.NONE;
-
-       private static final int VOLATILE = 0;
+       int status = AT2PCStatus.ACTIVE;
 
-       private static final int DURABLE = 1;
+       Set preparedParticipants = new HashSet();
 
-       Map participants2PC[] = new Map[2];
+       List completionParticipants = new ArrayList();
 
-       Set prepared = Collections.synchronizedSet(new HashSet());
+       Map volatile2PCParticipants = new HashMap();
 
-       List participantsComp = Collections.synchronizedList(new ArrayList());
+       Map durable2PCParticipants = new HashMap();
 
-       public static final int MAX_RETRIES = 10;
+       public static int maxRetries = 10;
 
        public static final int RETRY_DELAY_MILLIS = 20 * 1000;
 
@@ -57,13 +51,10 @@
 
        public ATCoordinatorImpl() throws MalformedURIException {
                super(COORDINATION_TYPE_ID);
-               status = AT2PCStatus.ACTIVE;
-               for (int i = 0; i < 2; i++)
-                       participants2PC[i] = Collections.synchronizedMap(new 
HashMap());
        }
 
        public EndpointReference register(String prot, EndpointReference pps)
-                       throws InvalidCoordinationProtocolException {
+                       throws AxisFault {
                if (!(status == AT2PCStatus.ACTIVE || status == 
AT2PCStatus.PREPARING_VOLATILE))
                        throw new IllegalStateException();
                CoordinationService cs = CoordinationService.getInstance();
@@ -71,7 +62,7 @@
                EndpointReference epr = null;
                if (prot.equals(PROTOCOL_ID_COMPLETION)) {
                        if (pps != null)
-                               participantsComp.add(pps);
+                               completionParticipants.add(pps);
                        epr = cs.getCompletionCoordinatorService(this);
                } else {
                        if (pps == null)
@@ -79,19 +70,19 @@
                        UUIDGen gen = UUIDGenFactory.getUUIDGen();
                        ref = "uuid:" + gen.nextUUID();
                        if (prot.equals(PROTOCOL_ID_VOLATILE_2PC))
-                               participants2PC[VOLATILE].put(ref, pps);
+                               volatile2PCParticipants.put(ref, pps);
                        else if (prot.equals(PROTOCOL_ID_DURABLE_2PC))
-                               participants2PC[DURABLE].put(ref, pps);
+                               durable2PCParticipants.put(ref, pps);
                        else
-                               throw new 
InvalidCoordinationProtocolException();
+                               throw INVALID_PROTOCOL_SOAP_FAULT;
                        epr = cs.getCoordinatorService(this, ref);
                }
                return epr;
        }
 
        public void forget2PC(String ref) {
-               if (participants2PC[VOLATILE].remove(ref) == null)
-                       participants2PC[DURABLE].remove(ref);
+               if (volatile2PCParticipants.remove(ref) == null)
+                       durable2PCParticipants.remove(ref);
        }
 
        public void rollback() {
@@ -109,7 +100,7 @@
                }
        }
 
-       public void aborted(String ref) {
+       public void aborted(String ref) throws AxisFault {
                switch (status) {
                case AT2PCStatus.ACTIVE:
                case AT2PCStatus.PREPARING_VOLATILE:
@@ -119,7 +110,7 @@
                        return;
 
                case AT2PCStatus.COMMITTING:
-                       throw new IllegalStateException();
+                       throw INVALID_STATE_SOAP_FAULT;
 
                case AT2PCStatus.ABORTING:
                        forget2PC(ref);
@@ -129,7 +120,7 @@
                }
        }
 
-       public void readOnly(String ref) {
+       public void readOnly(String ref) throws AxisFault {
                switch (status) {
                case AT2PCStatus.ACTIVE:
                case AT2PCStatus.PREPARING_VOLATILE:
@@ -138,7 +129,7 @@
                        return;
 
                case AT2PCStatus.COMMITTING:
-                       throw new IllegalStateException();
+                       throw INVALID_STATE_SOAP_FAULT;
 
                case AT2PCStatus.ABORTING:
                        forget2PC(ref);
@@ -148,7 +139,7 @@
                }
        }
 
-       public void replay(String ref) {
+       public void replay(String ref) throws AxisFault {
                switch (status) {
                case AT2PCStatus.ACTIVE:
                case AT2PCStatus.PREPARING_VOLATILE:
@@ -157,110 +148,120 @@
                        return;
 
                case AT2PCStatus.COMMITTING:
-                       EndpointReference epr = getEpr(ref);
-                       try {
-                               new ParticipantStub(epr).commitOperation(null);
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
+                       EndpointReference epr = getEprOf2PCParticipant(ref);
+                       if (epr != null)
+                               try {
+                                       new 
ParticipantStub(epr).commitOperation(null);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
                        return;
 
                case AT2PCStatus.ABORTING:
-                       epr = getEpr(ref);
-                       try {
-                               new 
ParticipantStub(epr).rollbackOperation(null);
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
+                       epr = getEprOf2PCParticipant(ref);
+                       if (epr != null)
+                               try {
+                                       new 
ParticipantStub(epr).rollbackOperation(null);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
                        return;
 
                case AT2PCStatus.NONE:
-                       epr = (EndpointReference) 
participants2PC[VOLATILE].get(ref);
+                       if (volatile2PCParticipants.containsKey(ref))
+                               throw INVALID_STATE_SOAP_FAULT;
+                       epr = (EndpointReference) 
durable2PCParticipants.get(ref);
                        if (epr != null)
-                               throw new IllegalStateException();
-                       epr = (EndpointReference) 
participants2PC[DURABLE].get(ref);
-                       try {
-                               new 
ParticipantStub(epr).rollbackOperation(null);
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
+                               try {
+                                       new 
ParticipantStub(epr).rollbackOperation(null);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
                }
        }
 
-       public void prepared(String ref) {
+       public void prepared(String ref) throws AxisFault {
                switch (status) {
                case AT2PCStatus.ACTIVE:
                        rollback();
-                       throw new IllegalStateException();
+                       throw INVALID_STATE_SOAP_FAULT;
 
                case AT2PCStatus.PREPARING_VOLATILE:
                case AT2PCStatus.PREPARING_DURABLE:
-                       prepared.add(ref);
+                       preparedParticipants.add(ref);
                        return;
 
                case AT2PCStatus.COMMITTING:
-                       EndpointReference epr = getEpr(ref);
-                       try {
-                               new ParticipantStub(epr).commitOperation(null);
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
+                       EndpointReference epr = getEprOf2PCParticipant(ref);
+                       if (epr != null)
+                               try {
+                                       new 
ParticipantStub(epr).commitOperation(null);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
                        return;
 
                case AT2PCStatus.ABORTING:
-               case AT2PCStatus.NONE:
-                       epr = (EndpointReference) 
participants2PC[VOLATILE].get(ref);
+                       if (volatile2PCParticipants.remove(ref) != null) 
+                               throw INVALID_STATE_SOAP_FAULT;
+                       epr = (EndpointReference) 
durable2PCParticipants.remove(ref);
                        if (epr != null) {
-                               if (status == AT2PCStatus.ABORTING)
-                                       forget2PC(ref);
-                               throw new IllegalStateException();
-                       }
-                       epr = (EndpointReference) 
participants2PC[DURABLE].get(ref);
-                       if (status == AT2PCStatus.ABORTING)
-                               forget2PC(ref);
-                       try {
-                               new 
ParticipantStub(epr).rollbackOperation(null);
-                       } catch (Exception e) {
-                               e.printStackTrace();
+                               try {
+                                       new 
ParticipantStub(epr).rollbackOperation(null);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
                        }
+                       return;
+
+               case AT2PCStatus.NONE:
+                       if (volatile2PCParticipants.containsKey(ref))
+                               throw INVALID_STATE_SOAP_FAULT;
+                       epr = (EndpointReference) 
durable2PCParticipants.get(ref);
+                       if (epr != null)
+                               try {
+                                       new 
ParticipantStub(epr).rollbackOperation(null);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
                }
        }
 
-       public void committed(String ref) {
+       public void committed(String ref) throws AxisFault {
                switch (status) {
                case AT2PCStatus.ACTIVE:
                case AT2PCStatus.PREPARING_VOLATILE:
                case AT2PCStatus.PREPARING_DURABLE:
                        rollback();
-                       throw new IllegalStateException();
+                       throw INVALID_STATE_SOAP_FAULT;
 
                case AT2PCStatus.COMMITTING:
                        forget2PC(ref);
                        return;
 
                case AT2PCStatus.ABORTING:
-                       throw new IllegalStateException();
+                       throw INVALID_STATE_SOAP_FAULT;
 
                case AT2PCStatus.NONE:
                }
        }
 
-       private EndpointReference getEpr(String ref) {
-               EndpointReference epr = (EndpointReference) 
participants2PC[VOLATILE].get(ref);
-               if (epr == null)
-                       epr = (EndpointReference) 
participants2PC[DURABLE].get(ref);
-               return epr;
+       private EndpointReference getEprOf2PCParticipant(String ref) {
+               EndpointReference epr = (EndpointReference) 
volatile2PCParticipants.get(ref);
+               if (epr != null)
+                       return epr;
+               return (EndpointReference) durable2PCParticipants.get(ref);
        }
 
-       private boolean prepare(int prot) {
+       private boolean prepare(Map participants) {
                int iters = 0;
                int status_old = status;
 
-               while (iters < MAX_RETRIES) {
+               while (iters < maxRetries) {
                        if (iters++ > 0)
                                pause(RETRY_DELAY_MILLIS - 
RESPONSE_DELAY_MILLIS);
 
-                       Iterator iter = 
participants2PC[prot].values().iterator();
+                       Iterator iter = participants.values().iterator();
                        while (iter.hasNext()) {
                                if (status == AT2PCStatus.ABORTING)
                                        return false;
@@ -273,7 +274,7 @@
 
                        pause(RESPONSE_DELAY_MILLIS);
 
-                       if 
(prepared.containsAll(participants2PC[prot].keySet()))
+                       if 
(preparedParticipants.containsAll(participants.keySet()))
                                return status == status_old;
                }
 
@@ -282,11 +283,11 @@
 
        private boolean prepare() {
                status = AT2PCStatus.PREPARING_VOLATILE;
-               if (!prepare(VOLATILE))
+               if (!prepare(volatile2PCParticipants))
                        return false;
 
                status = AT2PCStatus.PREPARING_DURABLE;
-               return prepare(DURABLE);
+               return prepare(durable2PCParticipants);
        }
 
        public void commit() {
@@ -319,16 +320,21 @@
                }
        }
 
+       private boolean noParticipantsToTerminate() {
+               return volatile2PCParticipants.isEmpty()
+                               && durable2PCParticipants.isEmpty();
+       }
+
        private void terminate() {
                int iters = 0;
-               while (iters < MAX_RETRIES
-                               && !(participants2PC[VOLATILE].isEmpty() && 
participants2PC[DURABLE].isEmpty())) {
+               while (iters < maxRetries && !noParticipantsToTerminate()) {
+
                        if (iters++ > 0)
                                pause(RETRY_DELAY_MILLIS - 
RESPONSE_DELAY_MILLIS);
 
-                       for (int prot = VOLATILE; prot == VOLATILE || prot == 
DURABLE; prot = prot == VOLATILE ? DURABLE
-                                       : -1) {
-                               Iterator iter = 
participants2PC[prot].values().iterator();
+                       Map participants = volatile2PCParticipants;
+                       while (true) {
+                               Iterator iter = 
participants.values().iterator();
                                while (iter.hasNext())
                                        try {
                                                ParticipantStub p = new 
ParticipantStub(
@@ -340,14 +346,18 @@
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
+                               if (participants == volatile2PCParticipants)
+                                       participants = durable2PCParticipants;
+                               else
+                                       break;
                        }
 
                        pause(RESPONSE_DELAY_MILLIS);
                }
 
-               if (participants2PC[VOLATILE].isEmpty()
-                               && participants2PC[DURABLE].isEmpty()) {
-                       Iterator iter = participantsComp.iterator();
+               if (noParticipantsToTerminate()) {
+
+                       Iterator iter = completionParticipants.iterator();
                        while (iter.hasNext())
                                try {
                                        CompletionInitiatorStub p = new 
CompletionInitiatorStub(
@@ -364,20 +374,9 @@
                status = AT2PCStatus.NONE;
        }
 
-       private AxisFault getInvalidStateSoapFault() {
-               QName subcode = new QName(
-                               "http://schemas.xmlsoap.org/ws/2004/10/wscoor";, 
"InvalidState");
-               String faultString = "The message was invalid for the current 
state of the activity.";
-               return new AxisFault(subcode, faultString, null, null);
-       }
-
        public synchronized void preparedOperation(Notification parameters)
                        throws RemoteException {
-               try {
-                       prepared(getParticipantRef());
-               } catch (IllegalStateException e) {
-                       throw getInvalidStateSoapFault();
-               }
+               prepared(getParticipantRef());
        }
 
        public synchronized void abortedOperation(Notification parameters)
@@ -400,10 +399,14 @@
                replay(getParticipantRef());
        }
 
-       private synchronized String getParticipantRef() {
-               AddressingHeaders header = (AddressingHeaders) 
MessageContext.getCurrentContext().getProperty(
+       private AddressingHeaders getAddressingHeaders() {
+               return (AddressingHeaders) 
MessageContext.getCurrentContext().getProperty(
                        Constants.ENV_ADDRESSING_REQUEST_HEADERS);
-               MessageElement e = 
header.getReferenceProperties().get(PARTICIPANT_REF);
+       }
+
+       private String getParticipantRef() {
+               AddressingHeaders headers = getAddressingHeaders();
+               MessageElement e = 
headers.getReferenceProperties().get(PARTICIPANT_REF);
                return e.getValue();
        }
 
@@ -417,12 +420,13 @@
                rollback();
        }
 
-       public synchronized void timeout() {
+       public synchronized void timeout() throws TimedOutException {
                System.out.println("[ATCoordinatorImpl] timeout "
                                + AT2PCStatus.getStatusName(status));
-               if (status == AT2PCStatus.NONE)
-                       return;
-               rollback();
-               throw new TimedOutException();
+               if (status != AT2PCStatus.NONE) {
+                       maxRetries = 3;
+                       rollback();
+                       throw new TimedOutException();
+               }
        }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to