Hi Dasarath,
hi folks,
attached are two patches of today's coding session with Georg.
First, the promised patch against ATCoordinator to speed up waiting
delays when preparing and terminating 2PC transactions. We managed to
get the total transaction time from ~12 seconds down to about 1 second.
In the process, we removed the RESPONSE_DELAY_MILLIS parameter as we
felt it is not needed anymore.
The second patch is against ParticipantImpl and adds exceptions that are
thrown when addressing headers are missing. This helps when debugging
clients that have an incomplete axis client configuration ;-)
BTW, can we use Java 1.5 (generics, @Override, ...) or do we rather
stick to 1.3/1.4?
BTW, did you look at our recent patch which adds a second configuration
option to specify the kandula services context URL to use?
Best regards & have a nice weekday,
-hannes
Index: ATCoordinatorImpl.java
===================================================================
--- ATCoordinatorImpl.java (revision 374212)
+++ ATCoordinatorImpl.java (working copy)
@@ -7,6 +7,7 @@
import java.net.MalformedURLException;
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -29,6 +30,7 @@
import org.apache.kandula.coordinator.TimedOutException;
import org.apache.kandula.wsat.Notification;
+
/**
* @author Dasarath Weeratunge
*
@@ -49,8 +51,6 @@
public static final int RETRY_DELAY_MILLIS = 20 * 1000;
- public static final int RESPONSE_DELAY_MILLIS = 3 * 1000;
-
public ATCoordinatorImpl() throws MalformedURIException {
super(COORDINATION_TYPE_ID);
}
@@ -93,9 +93,20 @@
return epr;
}
+ /**
+ * Forget about this particular participant of the transaction. We only
do this
+ * when that participant has acknowledged the final transaction
outcome.
+ * @param participantRef The participant reference.
+ */
public void forget2PC(String participantRef) {
+ /*
+ * Check for which protocols the participants had registered
+ * and remove
+ */
if (volatile2PCParticipants.remove(participantRef) == null)
durable2PCParticipants.remove(participantRef);
+
+ notifyAll();
}
public void rollback() {
@@ -207,6 +218,12 @@
}
}
+ /**
+ * Handles calls to the Coordinator's "prepared" method. The given
participant
+ * is marked as being prepared.
+ * @param participantRef The participant to be marked.
+ * @throws AxisFault Some fault, e.g. INVALID_STATE
+ */
public void prepared(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
@@ -217,17 +234,26 @@
}
return;
+ /*
+ * If we are currently in the PREPARE phase,
+ * mark the participant as prepared and notify
+ * the waiting thread.
+ * (wait() is called in prepare(Map) )
+ */
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
preparedParticipants.add(participantRef);
+ notifyAll();
return;
case AT2PCStatus.COMMITTING:
EndpointReference epr = getEprToRespond(participantRef);
if (epr != null)
try {
- getParticipantStub(participantRef,
epr).commitOperation(
- null);
+ getParticipantStub(
+ participantRef,
+ epr
+ ).commitOperation(null);
} catch (Exception e) {
e.printStackTrace();
}
@@ -293,37 +319,88 @@
}
}
+ /**
+ * Send a "prepare" message to all participants in the keySet of
"participants". The messages are
+ * resent up to "maxRetries" times; between each retry, up to
RETRY_DELAY_MILLIS milliseconds are
+ * waited.
+ * @param participants The map which contains the participants in its
keySet.
+ * @return true, if all participants are prepared.
+ * false, if there are unprepared participants
remaining after trying maxRetries times.
+ */
private boolean prepare(Map participants) {
- int iters = 0;
- int status_old = status;
+ /*
+ * Check if there are any participants in this map
+ */
+ if (participants.size() == 0)
+ /*
+ * "No participants" means OK
+ */
+ return true;
+
+
+ int iters = 0; // iteration count
+ int status_old = status; // State when beginning to
prepare
+ /*
+ * Send the "prepare" message to all unprepared participants.
Retry up to
+ * maxRetries times.
+ */
while (iters < maxRetries) {
- if (iters++ > 0)
- pause(RETRY_DELAY_MILLIS -
RESPONSE_DELAY_MILLIS);
-
Iterator iter = participants.keySet().iterator();
while (iter.hasNext()) {
if (status == AT2PCStatus.ABORTING)
return false;
try {
+ /*
+ * Call the participant's "prepare"
method
+ */
String participantRef = (String)
iter.next();
- getParticipantStub(participantRef,
- (EndpointReference)
participants.get(participantRef)).prepareOperation(
- null);
+ getParticipantStub(
+ participantRef,
+ (EndpointReference)
participants.get(participantRef)
+ ).prepareOperation(null);
} catch (Exception e) {
e.printStackTrace();
}
}
-
- pause(RESPONSE_DELAY_MILLIS);
-
- if
(preparedParticipants.containsAll(participants.keySet()))
- return status == status_old;
+
+ /*
+ * Wait for arriving messages, max. RETRY_DELAY_MILLIS
milliseconds for each retry.
+ */
+ long startTime = (new Date()).getTime();
+ long curTime;
+
+ /*
+ * For each retry: wait for incoming messages. If
unprepared participants remain,
+ * wait for the remaining time up until
RETRY_DELAY_MILLIS is reached or another
+ * message arrives.
+ */
+ while((curTime = (new Date().getTime())) < startTime +
RETRY_DELAY_MILLIS){
+ /*
+ * Wait for incoming messages. notifyAll() is
called in prepared(String).
+ */
+ try{
+ wait(startTime - curTime +
RETRY_DELAY_MILLIS);
+ }catch(Exception e){
+ // No exception handling needed for
wait();
+ }
+
+ /*
+ * Are all participants prepared?
+ */
+ if
(preparedParticipants.containsAll(participants.keySet()))
+ // Yes! - Return true, if the
transaction state did not change in the mean time.
+ return status == status_old;
+ }
}
+ /*
+ * After trying so hard and sending maxRetries messages, there
is still at least
+ * one unprepared participant.
+ */
return false;
}
-
+
private boolean prepare() {
status = AT2PCStatus.PREPARING_VOLATILE;
if (!prepare(volatile2PCParticipants))
@@ -368,22 +445,41 @@
&& durable2PCParticipants.isEmpty();
}
+ /**
+ * Handles transaction teardown and communicate the final state to
+ * participants.
+ * Each participant is notified up to maxRetries times. Between each
message, we will
+ * wait up to RETRY_DELAY_MILLIS milliseconds.
+ */
private void terminate() {
int iters = 0;
+
+ /*
+ * This is the main loop. While there are participants who did
not
+ * yet acknowledge our message, (re)send our message to them.
+ */
+ waitForAllParticipantsToComplete:
while (iters < maxRetries && !noParticipantsToTerminate()) {
-
- if (iters++ > 0)
- pause(RETRY_DELAY_MILLIS -
RESPONSE_DELAY_MILLIS);
-
+ /*
+ * Participant set to operate on. For each retry, send
+ * our message to volatile peers first and then to
durable participants.
+ */
Map participants = volatile2PCParticipants;
while (true) {
Iterator iter =
participants.keySet().iterator();
- while (iter.hasNext())
+
+ while (iter.hasNext()){
try {
+ /*
+ * Get the participant's
protocol service
+ * and send our final state
message.
+ */
String participantRef =
(String) iter.next();
ParticipantStub p =
getParticipantStub(
participantRef,
- (EndpointReference)
participants.get(participantRef));
+ (EndpointReference)
participants.get(participantRef)
+ );
+
if (status ==
AT2PCStatus.ABORTING)
p.rollbackOperation(null);
else
@@ -391,17 +487,55 @@
} catch (Exception e) {
e.printStackTrace();
}
+ }
+
+ /*
+ * After all volatile participants are notified,
+ * continue with durable participants. After
that,
+ * wait for incoming messages.
+ */
if (participants == volatile2PCParticipants)
participants = durable2PCParticipants;
else
break;
}
- pause(RESPONSE_DELAY_MILLIS);
+ /*
+ * Messages to all remaining participants are out. Wait
up until RETRY_DELAY_MILLIS
+ * milliseconds for replies.
+ * On each incoming reply, forget2PC() will call notify
so we can check if all
+ * participants have acknownledged the transaction
outcome and can thus continue.
+ * If there are remaining peers after an incoming
message, return to wait and sleep for the
+ * rest of the RETRY_DELAY_MILLIS period.
+ */
+ try{
+ long startTime = (new Date()).getTime();
+ long curTime;
+
+ while((curTime = (new Date().getTime())) <
startTime + RETRY_DELAY_MILLIS){
+ /*
+ * Wait for incoming acknowledgements.
notify() is called in forget2PC(String).
+ */
+ wait(startTime - curTime +
RETRY_DELAY_MILLIS);
+
+ if (noParticipantsToTerminate())
+ break
waitForAllParticipantsToComplete;
+ }
+ }catch(Exception e){
+ e.printStackTrace();
+ }
}
+ /*
+ * All particpants have acknowledged the final transaction
state. Notify
+ * all peers who have subscribed for the completion protocol
and forget
+ * about our litte transaction. ;-)
+ */
if (noParticipantsToTerminate()) {
-
+ /*
+ * TODO shouldn't this message also be acknowledged, at
least if
+ * there was an exception (e.g. timeout) caught?
+ */
Iterator iter = completionParticipants.iterator();
while (iter.hasNext())
try {
@@ -415,7 +549,6 @@
e.printStackTrace();
}
}
-
status = AT2PCStatus.NONE;
}
Index: ParticipantImpl.java
===================================================================
--- ParticipantImpl.java (revision 374212)
+++ ParticipantImpl.java (working copy)
@@ -15,13 +15,16 @@
*
*/
public class ParticipantImpl implements ParticipantPortType {
-
+ public static final String ERRORMESSAGE_MISSING_EPR = "Could not
correlate your message to an existing transaction! (are WS-Adressing headers
missing?)";
+
public void prepareOperation(Notification parameters)
throws RemoteException {
ParticipantPortType callback = (ParticipantPortType)
CallbackRegistry.getInstance().correlateMessage(
CallbackRegistry.CALLBACK_REF, false);
if (callback != null)
callback.prepareOperation(parameters);
+ else
+ throw new RemoteException(ERRORMESSAGE_MISSING_EPR);
}
public void commitOperation(Notification parameters) throws
RemoteException {
@@ -29,6 +32,8 @@
CallbackRegistry.CALLBACK_REF, false);
if (callback != null)
callback.commitOperation(parameters);
+ else
+ throw new RemoteException(ERRORMESSAGE_MISSING_EPR);
}
public void rollbackOperation(Notification parameters)
@@ -37,6 +42,9 @@
CallbackRegistry.CALLBACK_REF, false);
if (callback != null)
callback.rollbackOperation(parameters);
+ else
+ throw new RemoteException(ERRORMESSAGE_MISSING_EPR);
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]