Hi Dasarath,
hi folks,

attached are two patches of today's coding session with Georg.

First, the promised patch against ATCoordinator to speed up waiting delays when preparing and terminating 2PC transactions. We managed to get the total transaction time from ~12 seconds down to about 1 second. In the process, we removed the RESPONSE_DELAY_MILLIS parameter as we felt it is not needed anymore.


The second patch is against ParticipantImpl and adds exceptions that are thrown when addressing headers are missing. This helps when debugging clients that have an incomplete axis client configuration ;-)


BTW, can we use Java 1.5 (generics, @Override, ...) or do we rather stick to 1.3/1.4?


BTW, did you look at our recent patch which adds a second configuration option to specify the kandula services context URL to use?


Best regards & have a nice weekday,

        -hannes
Index: ATCoordinatorImpl.java
===================================================================
--- ATCoordinatorImpl.java      (revision 374212)
+++ ATCoordinatorImpl.java      (working copy)
@@ -7,6 +7,7 @@
 import java.net.MalformedURLException;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -29,6 +30,7 @@
 import org.apache.kandula.coordinator.TimedOutException;
 import org.apache.kandula.wsat.Notification;
 
+
 /**
  * @author Dasarath Weeratunge
  *  
@@ -49,8 +51,6 @@
 
        public static final int RETRY_DELAY_MILLIS = 20 * 1000;
 
-       public static final int RESPONSE_DELAY_MILLIS = 3 * 1000;
-
        public ATCoordinatorImpl() throws MalformedURIException {
                super(COORDINATION_TYPE_ID);
        }
@@ -93,9 +93,20 @@
                return epr;
        }
 
+       /**
+        * Forget about this particular participant of the transaction. We only 
do this
+        * when that participant has acknowledged the final transaction 
outcome. 
+        * @param participantRef The participant reference.
+        */
        public void forget2PC(String participantRef) {
+               /*
+                * Check for which protocols the participants had registered
+                * and remove
+                */
                if (volatile2PCParticipants.remove(participantRef) == null)
                        durable2PCParticipants.remove(participantRef);
+               
+               notifyAll();
        }
 
        public void rollback() {
@@ -207,6 +218,12 @@
                }
        }
 
+       /**
+        * Handles calls to the Coordinator's "prepared" method. The given 
participant
+        * is marked as being prepared. 
+        * @param participantRef The participant to be marked.
+        * @throws AxisFault Some fault, e.g. INVALID_STATE
+        */
        public void prepared(String participantRef) throws AxisFault {
                switch (status) {
                case AT2PCStatus.ACTIVE:
@@ -217,17 +234,26 @@
                        }
                        return;
 
+                       /*
+                        * If we are currently in the PREPARE phase,
+                        * mark the participant as prepared and notify
+                        * the waiting thread.
+                        * (wait() is called in prepare(Map) )
+                        */
                case AT2PCStatus.PREPARING_VOLATILE:
                case AT2PCStatus.PREPARING_DURABLE:
                        preparedParticipants.add(participantRef);
+                       notifyAll();
                        return;
 
                case AT2PCStatus.COMMITTING:
                        EndpointReference epr = getEprToRespond(participantRef);
                        if (epr != null)
                                try {
-                                       getParticipantStub(participantRef, 
epr).commitOperation(
-                                               null);
+                                       getParticipantStub(
+                                                       participantRef,
+                                                       epr
+                                       ).commitOperation(null);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
@@ -293,37 +319,88 @@
                }
        }
 
+       /**
+        * Send a "prepare" message to all participants in the keySet of 
"participants". The messages are
+        * resent up to "maxRetries" times; between each retry, up to 
RETRY_DELAY_MILLIS milliseconds are
+        * waited.
+        * @param participants The map which contains the participants in its 
keySet.
+        * @return true, if all participants are prepared.
+        *                 false, if there are unprepared participants 
remaining after trying maxRetries times.
+        */
        private boolean prepare(Map participants) {
-               int iters = 0;
-               int status_old = status;
+               /*
+                * Check if there are any participants in this map
+                */
+               if (participants.size() == 0)
+                       /*
+                        * "No participants" means OK
+                        */ 
+                       return true;
+               
+               
+               int iters = 0;                          // iteration count
+               int status_old = status;        // State when beginning to 
prepare
 
+               /*
+                * Send the "prepare" message to all unprepared participants. 
Retry up to
+                * maxRetries times.
+                */
                while (iters < maxRetries) {
-                       if (iters++ > 0)
-                               pause(RETRY_DELAY_MILLIS - 
RESPONSE_DELAY_MILLIS);
-
                        Iterator iter = participants.keySet().iterator();
                        while (iter.hasNext()) {
                                if (status == AT2PCStatus.ABORTING)
                                        return false;
                                try {
+                                       /*
+                                        * Call the participant's "prepare" 
method
+                                        */
                                        String participantRef = (String) 
iter.next();
-                                       getParticipantStub(participantRef,
-                                               (EndpointReference) 
participants.get(participantRef)).prepareOperation(
-                                               null);
+                                       getParticipantStub(
+                                                       participantRef,
+                                                       (EndpointReference) 
participants.get(participantRef)
+                                       ).prepareOperation(null);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
-
-                       pause(RESPONSE_DELAY_MILLIS);
-
-                       if 
(preparedParticipants.containsAll(participants.keySet()))
-                               return status == status_old;
+                       
+                       /*
+                        * Wait for arriving messages, max. RETRY_DELAY_MILLIS 
milliseconds for each retry. 
+                        */
+                       long startTime = (new Date()).getTime();
+                       long curTime;
+                       
+                       /*
+                        * For each retry: wait for incoming messages. If 
unprepared participants remain,
+                        * wait for the remaining time up until 
RETRY_DELAY_MILLIS is reached or another
+                        * message arrives.
+                        */
+                       while((curTime = (new Date().getTime())) < startTime + 
RETRY_DELAY_MILLIS){
+                               /*
+                                * Wait for incoming messages. notifyAll() is 
called in prepared(String).
+                                */ 
+                               try{
+                                       wait(startTime - curTime + 
RETRY_DELAY_MILLIS);
+                               }catch(Exception e){
+                                       // No exception handling needed for 
wait();
+                               }
+                               
+                               /*
+                                * Are all participants prepared? 
+                                */
+                               if 
(preparedParticipants.containsAll(participants.keySet()))
+                                       // Yes! - Return true, if the 
transaction state did not change in the mean time.
+                                       return status == status_old;
+                       }               
                }
 
+               /*
+                * After trying so hard and sending maxRetries messages, there 
is still at least
+                * one unprepared participant.
+                */
                return false;
        }
-
+       
        private boolean prepare() {
                status = AT2PCStatus.PREPARING_VOLATILE;
                if (!prepare(volatile2PCParticipants))
@@ -368,22 +445,41 @@
                                && durable2PCParticipants.isEmpty();
        }
 
+       /**
+        * Handles transaction teardown and communicate the final state to
+        * participants.
+        * Each participant is notified up to maxRetries times. Between each 
message, we will
+        * wait up to RETRY_DELAY_MILLIS milliseconds.
+        */
        private void terminate() {
                int iters = 0;
+
+               /*
+                * This is the main loop. While there are participants who did 
not
+                * yet acknowledge our message, (re)send our message to them.
+                */
+               waitForAllParticipantsToComplete:
                while (iters < maxRetries && !noParticipantsToTerminate()) {
-
-                       if (iters++ > 0)
-                               pause(RETRY_DELAY_MILLIS - 
RESPONSE_DELAY_MILLIS);
-
+                       /*
+                        * Participant set to operate on. For each retry, send
+                        * our message to volatile peers first and then to 
durable participants.
+                        */
                        Map participants = volatile2PCParticipants;
                        while (true) {
                                Iterator iter = 
participants.keySet().iterator();
-                               while (iter.hasNext())
+                               
+                               while (iter.hasNext()){
                                        try {
+                                               /*
+                                                * Get the participant's 
protocol service
+                                                * and send our final state 
message.
+                                                */
                                                String participantRef = 
(String) iter.next();
                                                ParticipantStub p = 
getParticipantStub(
                                                        participantRef,
-                                                       (EndpointReference) 
participants.get(participantRef));
+                                                       (EndpointReference) 
participants.get(participantRef)
+                                               );
+
                                                if (status == 
AT2PCStatus.ABORTING)
                                                        
p.rollbackOperation(null);
                                                else
@@ -391,17 +487,55 @@
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
+                               }
+                       
+                               /*
+                                * After all volatile participants are notified,
+                                * continue with durable participants. After 
that,
+                                * wait for incoming messages.
+                                */
                                if (participants == volatile2PCParticipants)
                                        participants = durable2PCParticipants;
                                else
                                        break;
                        }
 
-                       pause(RESPONSE_DELAY_MILLIS);
+                       /*
+                        * Messages to all remaining participants are out. Wait 
up until RETRY_DELAY_MILLIS
+                        * milliseconds for replies.
+                        * On each incoming reply, forget2PC() will call notify 
so we can check if all
+                        * participants have acknownledged the transaction 
outcome and can thus continue.
+                        * If there are remaining peers after an incoming 
message, return to wait and sleep for the
+                        * rest of the RETRY_DELAY_MILLIS period. 
+                        */
+                       try{
+                               long startTime = (new Date()).getTime();
+                               long curTime;
+
+                               while((curTime = (new Date().getTime())) < 
startTime + RETRY_DELAY_MILLIS){
+                                       /*
+                                        * Wait for incoming acknowledgements. 
notify() is called in forget2PC(String).
+                                        */
+                                       wait(startTime - curTime + 
RETRY_DELAY_MILLIS);
+
+                                       if (noParticipantsToTerminate())
+                                               break 
waitForAllParticipantsToComplete;
+                               }
+                       }catch(Exception e){
+                               e.printStackTrace();
+                       }
                }
 
+               /*
+                * All particpants have acknowledged the final transaction 
state. Notify
+                * all peers who have subscribed for the completion protocol 
and forget
+                * about our litte transaction. ;-)
+                */
                if (noParticipantsToTerminate()) {
-
+                       /* 
+                        * TODO shouldn't this message also be acknowledged, at 
least if
+                        * there was an exception (e.g. timeout) caught?
+                        */
                        Iterator iter = completionParticipants.iterator();
                        while (iter.hasNext())
                                try {
@@ -415,7 +549,6 @@
                                        e.printStackTrace();
                                }
                }
-
                status = AT2PCStatus.NONE;
        }
 
Index: ParticipantImpl.java
===================================================================
--- ParticipantImpl.java        (revision 374212)
+++ ParticipantImpl.java        (working copy)
@@ -15,13 +15,16 @@
  *
  */
 public class ParticipantImpl implements ParticipantPortType {
-
+       public static final String ERRORMESSAGE_MISSING_EPR = "Could not 
correlate your message to an existing transaction! (are WS-Adressing headers 
missing?)";
+       
        public void prepareOperation(Notification parameters)
                        throws RemoteException {
                ParticipantPortType callback = (ParticipantPortType) 
CallbackRegistry.getInstance().correlateMessage(
                        CallbackRegistry.CALLBACK_REF, false);
                if (callback != null)
                        callback.prepareOperation(parameters);
+               else
+                       throw new RemoteException(ERRORMESSAGE_MISSING_EPR);
        }
 
        public void commitOperation(Notification parameters) throws 
RemoteException {
@@ -29,6 +32,8 @@
                        CallbackRegistry.CALLBACK_REF, false);
                if (callback != null)
                        callback.commitOperation(parameters);
+               else
+                       throw new RemoteException(ERRORMESSAGE_MISSING_EPR);
        }
 
        public void rollbackOperation(Notification parameters)
@@ -37,6 +42,9 @@
                        CallbackRegistry.CALLBACK_REF, false);
                if (callback != null)
                        callback.rollbackOperation(parameters);
+               else
+                       throw new RemoteException(ERRORMESSAGE_MISSING_EPR);
+
        }
 
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to