Author: dasarath
Date: Sat Mar  4 22:49:06 2006
New Revision: 383288

URL: http://svn.apache.org/viewcvs?rev=383288&view=rev
Log:
applied patch for Hannes 'speed improvements

Modified:
    
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java

Modified: 
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
URL: 
http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java?rev=383288&r1=383287&r2=383288&view=diff
==============================================================================
--- 
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
 (original)
+++ 
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
 Sat Mar  4 22:49:06 2006
@@ -7,6 +7,7 @@
 import java.net.MalformedURLException;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,6 +32,7 @@
 
 /**
  * @author Dasarath Weeratunge
+ * @author Hannes Erven <[EMAIL PROTECTED]>
  *  
  */
 public class ATCoordinatorImpl extends CoordinatorImpl implements 
ATCoordinator {
@@ -49,8 +51,6 @@
 
        public static final int RETRY_DELAY_MILLIS = 20 * 1000;
 
-       public static final int RESPONSE_DELAY_MILLIS = 3 * 1000;
-
        public ATCoordinatorImpl() throws MalformedURIException {
                super(COORDINATION_TYPE_ID);
        }
@@ -93,9 +93,22 @@
                return epr;
        }
 
+       /**
+        * Forget about this particular participant of the transaction. We only 
do
+        * this when that participant has acknowledged the final transaction
+        * outcome.
+        * 
+        * @param participantRef
+        *            The participant reference.
+        */
        public void forget2PC(String participantRef) {
+               /*
+                * Check for which protocols the participants had registered 
and remove
+                */
                if (volatile2PCParticipants.remove(participantRef) == null)
                        durable2PCParticipants.remove(participantRef);
+
+               notifyAll();
        }
 
        public void rollback() {
@@ -207,6 +220,15 @@
                }
        }
 
+       /**
+        * Handles calls to the Coordinator's "prepared" method. The given
+        * participant is marked as being prepared.
+        * 
+        * @param participantRef
+        *            The participant to be marked.
+        * @throws AxisFault
+        *             Some fault, e.g. INVALID_STATE
+        */
        public void prepared(String participantRef) throws AxisFault {
                switch (status) {
                case AT2PCStatus.ACTIVE:
@@ -217,9 +239,15 @@
                        }
                        return;
 
+               /*
+                * If we are currently in the PREPARE phase, mark the 
participant as
+                * prepared and notify the waiting thread. (wait() is called in
+                * prepare(Map) )
+                */
                case AT2PCStatus.PREPARING_VOLATILE:
                case AT2PCStatus.PREPARING_DURABLE:
                        preparedParticipants.add(participantRef);
+                       notifyAll();
                        return;
 
                case AT2PCStatus.COMMITTING:
@@ -293,19 +321,42 @@
                }
        }
 
+       /**
+        * Send a "prepare" message to all participants in the keySet of
+        * "participants". The messages are resent up to "maxRetries" times; 
between
+        * each retry, up to RETRY_DELAY_MILLIS milliseconds are waited.
+        * 
+        * @param participants
+        *            The map which contains the participants in its keySet.
+        * @return true, if all participants are prepared. false, if there are
+        *         unprepared participants remaining after trying maxRetries 
times.
+        */
        private boolean prepare(Map participants) {
-               int iters = 0;
-               int status_old = status;
-
+               /*
+                * Check if there are any participants in this map
+                */
+               if (participants.size() == 0)
+                       /*
+                        * "No participants" means OK
+                        */
+                       return true;
+
+               int iters = 0; // iteration count
+               int status_old = status; // State when beginning to prepare
+
+               /*
+                * Send the "prepare" message to all unprepared participants. 
Retry up
+                * to maxRetries times.
+                */
                while (iters < maxRetries) {
-                       if (iters++ > 0)
-                               pause(RETRY_DELAY_MILLIS - 
RESPONSE_DELAY_MILLIS);
-
                        Iterator iter = participants.keySet().iterator();
                        while (iter.hasNext()) {
                                if (status == AT2PCStatus.ABORTING)
                                        return false;
                                try {
+                                       /*
+                                        * Call the participant's "prepare" 
method
+                                        */
                                        String participantRef = (String) 
iter.next();
                                        getParticipantStub(participantRef,
                                                (EndpointReference) 
participants.get(participantRef)).prepareOperation(
@@ -315,12 +366,44 @@
                                }
                        }
 
-                       pause(RESPONSE_DELAY_MILLIS);
+                       /*
+                        * Wait for arriving messages, max. RETRY_DELAY_MILLIS 
milliseconds
+                        * for each retry.
+                        */
+                       long startTime = (new Date()).getTime();
+                       long curTime;
+
+                       /*
+                        * For each retry: wait for incoming messages. If 
unprepared
+                        * participants remain, wait for the remaining time up 
until
+                        * RETRY_DELAY_MILLIS is reached or another message 
arrives.
+                        */
+                       while ((curTime = (new Date().getTime())) < startTime
+                                       + RETRY_DELAY_MILLIS) {
+                               /*
+                                * Wait for incoming messages. notifyAll() is 
called in
+                                * prepared(String).
+                                */
+                               try {
+                                       wait(startTime - curTime + 
RETRY_DELAY_MILLIS);
+                               } catch (Exception e) {
+                                       // No exception handling needed for 
wait();
+                               }
 
-                       if 
(preparedParticipants.containsAll(participants.keySet()))
-                               return status == status_old;
+                               /*
+                                * Are all participants prepared?
+                                */
+                               if 
(preparedParticipants.containsAll(participants.keySet()))
+                                       // Yes! - Return true, if the 
transaction state did not
+                                       // change in the mean time.
+                                       return status == status_old;
+                       }
                }
 
+               /*
+                * After trying so hard and sending maxRetries messages, there 
is still
+                * at least one unprepared participant.
+                */
                return false;
        }
 
@@ -368,22 +451,39 @@
                                && durable2PCParticipants.isEmpty();
        }
 
+       /**
+        * Handles transaction teardown and communicate the final state to
+        * participants. Each participant is notified up to maxRetries times.
+        * Between each message, we will wait up to RETRY_DELAY_MILLIS 
milliseconds.
+        */
        private void terminate() {
                int iters = 0;
-               while (iters < maxRetries && !noParticipantsToTerminate()) {
-
-                       if (iters++ > 0)
-                               pause(RETRY_DELAY_MILLIS - 
RESPONSE_DELAY_MILLIS);
 
+               /*
+                * This is the main loop. While there are participants who did 
not yet
+                * acknowledge our message, (re)send our message to them.
+                */
+               waitForAllParticipantsToComplete: while (iters < maxRetries
+                               && !noParticipantsToTerminate()) {
+                       /*
+                        * Participant set to operate on. For each retry, send 
our message
+                        * to volatile peers first and then to durable 
participants.
+                        */
                        Map participants = volatile2PCParticipants;
                        while (true) {
                                Iterator iter = 
participants.keySet().iterator();
-                               while (iter.hasNext())
+
+                               while (iter.hasNext()) {
                                        try {
+                                               /*
+                                                * Get the participant's 
protocol service and send our
+                                                * final state message.
+                                                */
                                                String participantRef = 
(String) iter.next();
                                                ParticipantStub p = 
getParticipantStub(
                                                        participantRef,
                                                        (EndpointReference) 
participants.get(participantRef));
+
                                                if (status == 
AT2PCStatus.ABORTING)
                                                        
p.rollbackOperation(null);
                                                else
@@ -391,17 +491,57 @@
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
+                               }
+
+                               /*
+                                * After all volatile participants are 
notified, continue with
+                                * durable participants. After that, wait for 
incoming messages.
+                                */
                                if (participants == volatile2PCParticipants)
                                        participants = durable2PCParticipants;
                                else
                                        break;
                        }
 
-                       pause(RESPONSE_DELAY_MILLIS);
+                       /*
+                        * Messages to all remaining participants are out. Wait 
up until
+                        * RETRY_DELAY_MILLIS milliseconds for replies. On each 
incoming
+                        * reply, forget2PC() will call notify so we can check 
if all
+                        * participants have acknownledged the transaction 
outcome and can
+                        * thus continue. If there are remaining peers after an 
incoming
+                        * message, return to wait and sleep for the rest of the
+                        * RETRY_DELAY_MILLIS period.
+                        */
+                       try {
+                               long startTime = (new Date()).getTime();
+                               long curTime;
+
+                               while ((curTime = (new Date().getTime())) < 
startTime
+                                               + RETRY_DELAY_MILLIS) {
+                                       /*
+                                        * Wait for incoming acknowledgements. 
notify() is called in
+                                        * forget2PC(String).
+                                        */
+                                       wait(startTime - curTime + 
RETRY_DELAY_MILLIS);
+
+                                       if (noParticipantsToTerminate())
+                                               break 
waitForAllParticipantsToComplete;
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
                }
 
+               /*
+                * All particpants have acknowledged the final transaction 
state. Notify
+                * all peers who have subscribed for the completion protocol 
and forget
+                * about our litte transaction. ;-)
+                */
                if (noParticipantsToTerminate()) {
-
+                       /*
+                        * TODO shouldn't this message also be acknowledged, at 
least if
+                        * there was an exception (e.g. timeout) caught?
+                        */
                        Iterator iter = completionParticipants.iterator();
                        while (iter.hasNext())
                                try {
@@ -415,7 +555,6 @@
                                        e.printStackTrace();
                                }
                }
-
                status = AT2PCStatus.NONE;
        }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to