Author: dasarath
Date: Sat Mar 4 22:49:06 2006
New Revision: 383288
URL: http://svn.apache.org/viewcvs?rev=383288&view=rev
Log:
applied patch for Hannes 'speed improvements
Modified:
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
Modified:
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
URL:
http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java?rev=383288&r1=383287&r2=383288&view=diff
==============================================================================
---
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
(original)
+++
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
Sat Mar 4 22:49:06 2006
@@ -7,6 +7,7 @@
import java.net.MalformedURLException;
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,6 +32,7 @@
/**
* @author Dasarath Weeratunge
+ * @author Hannes Erven <[EMAIL PROTECTED]>
*
*/
public class ATCoordinatorImpl extends CoordinatorImpl implements
ATCoordinator {
@@ -49,8 +51,6 @@
public static final int RETRY_DELAY_MILLIS = 20 * 1000;
- public static final int RESPONSE_DELAY_MILLIS = 3 * 1000;
-
public ATCoordinatorImpl() throws MalformedURIException {
super(COORDINATION_TYPE_ID);
}
@@ -93,9 +93,22 @@
return epr;
}
+ /**
+ * Forget about this particular participant of the transaction. We only
do
+ * this when that participant has acknowledged the final transaction
+ * outcome.
+ *
+ * @param participantRef
+ * The participant reference.
+ */
public void forget2PC(String participantRef) {
+ /*
+ * Check for which protocols the participants had registered
and remove
+ */
if (volatile2PCParticipants.remove(participantRef) == null)
durable2PCParticipants.remove(participantRef);
+
+ notifyAll();
}
public void rollback() {
@@ -207,6 +220,15 @@
}
}
+ /**
+ * Handles calls to the Coordinator's "prepared" method. The given
+ * participant is marked as being prepared.
+ *
+ * @param participantRef
+ * The participant to be marked.
+ * @throws AxisFault
+ * Some fault, e.g. INVALID_STATE
+ */
public void prepared(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
@@ -217,9 +239,15 @@
}
return;
+ /*
+ * If we are currently in the PREPARE phase, mark the
participant as
+ * prepared and notify the waiting thread. (wait() is called in
+ * prepare(Map) )
+ */
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
preparedParticipants.add(participantRef);
+ notifyAll();
return;
case AT2PCStatus.COMMITTING:
@@ -293,19 +321,42 @@
}
}
+ /**
+ * Send a "prepare" message to all participants in the keySet of
+ * "participants". The messages are resent up to "maxRetries" times;
between
+ * each retry, up to RETRY_DELAY_MILLIS milliseconds are waited.
+ *
+ * @param participants
+ * The map which contains the participants in its keySet.
+ * @return true, if all participants are prepared. false, if there are
+ * unprepared participants remaining after trying maxRetries
times.
+ */
private boolean prepare(Map participants) {
- int iters = 0;
- int status_old = status;
-
+ /*
+ * Check if there are any participants in this map
+ */
+ if (participants.size() == 0)
+ /*
+ * "No participants" means OK
+ */
+ return true;
+
+ int iters = 0; // iteration count
+ int status_old = status; // State when beginning to prepare
+
+ /*
+ * Send the "prepare" message to all unprepared participants.
Retry up
+ * to maxRetries times.
+ */
while (iters < maxRetries) {
- if (iters++ > 0)
- pause(RETRY_DELAY_MILLIS -
RESPONSE_DELAY_MILLIS);
-
Iterator iter = participants.keySet().iterator();
while (iter.hasNext()) {
if (status == AT2PCStatus.ABORTING)
return false;
try {
+ /*
+ * Call the participant's "prepare"
method
+ */
String participantRef = (String)
iter.next();
getParticipantStub(participantRef,
(EndpointReference)
participants.get(participantRef)).prepareOperation(
@@ -315,12 +366,44 @@
}
}
- pause(RESPONSE_DELAY_MILLIS);
+ /*
+ * Wait for arriving messages, max. RETRY_DELAY_MILLIS
milliseconds
+ * for each retry.
+ */
+ long startTime = (new Date()).getTime();
+ long curTime;
+
+ /*
+ * For each retry: wait for incoming messages. If
unprepared
+ * participants remain, wait for the remaining time up
until
+ * RETRY_DELAY_MILLIS is reached or another message
arrives.
+ */
+ while ((curTime = (new Date().getTime())) < startTime
+ + RETRY_DELAY_MILLIS) {
+ /*
+ * Wait for incoming messages. notifyAll() is
called in
+ * prepared(String).
+ */
+ try {
+ wait(startTime - curTime +
RETRY_DELAY_MILLIS);
+ } catch (Exception e) {
+ // No exception handling needed for
wait();
+ }
- if
(preparedParticipants.containsAll(participants.keySet()))
- return status == status_old;
+ /*
+ * Are all participants prepared?
+ */
+ if
(preparedParticipants.containsAll(participants.keySet()))
+ // Yes! - Return true, if the
transaction state did not
+ // change in the mean time.
+ return status == status_old;
+ }
}
+ /*
+ * After trying so hard and sending maxRetries messages, there
is still
+ * at least one unprepared participant.
+ */
return false;
}
@@ -368,22 +451,39 @@
&& durable2PCParticipants.isEmpty();
}
+ /**
+ * Handles transaction teardown and communicate the final state to
+ * participants. Each participant is notified up to maxRetries times.
+ * Between each message, we will wait up to RETRY_DELAY_MILLIS
milliseconds.
+ */
private void terminate() {
int iters = 0;
- while (iters < maxRetries && !noParticipantsToTerminate()) {
-
- if (iters++ > 0)
- pause(RETRY_DELAY_MILLIS -
RESPONSE_DELAY_MILLIS);
+ /*
+ * This is the main loop. While there are participants who did
not yet
+ * acknowledge our message, (re)send our message to them.
+ */
+ waitForAllParticipantsToComplete: while (iters < maxRetries
+ && !noParticipantsToTerminate()) {
+ /*
+ * Participant set to operate on. For each retry, send
our message
+ * to volatile peers first and then to durable
participants.
+ */
Map participants = volatile2PCParticipants;
while (true) {
Iterator iter =
participants.keySet().iterator();
- while (iter.hasNext())
+
+ while (iter.hasNext()) {
try {
+ /*
+ * Get the participant's
protocol service and send our
+ * final state message.
+ */
String participantRef =
(String) iter.next();
ParticipantStub p =
getParticipantStub(
participantRef,
(EndpointReference)
participants.get(participantRef));
+
if (status ==
AT2PCStatus.ABORTING)
p.rollbackOperation(null);
else
@@ -391,17 +491,57 @@
} catch (Exception e) {
e.printStackTrace();
}
+ }
+
+ /*
+ * After all volatile participants are
notified, continue with
+ * durable participants. After that, wait for
incoming messages.
+ */
if (participants == volatile2PCParticipants)
participants = durable2PCParticipants;
else
break;
}
- pause(RESPONSE_DELAY_MILLIS);
+ /*
+ * Messages to all remaining participants are out. Wait
up until
+ * RETRY_DELAY_MILLIS milliseconds for replies. On each
incoming
+ * reply, forget2PC() will call notify so we can check
if all
+ * participants have acknownledged the transaction
outcome and can
+ * thus continue. If there are remaining peers after an
incoming
+ * message, return to wait and sleep for the rest of the
+ * RETRY_DELAY_MILLIS period.
+ */
+ try {
+ long startTime = (new Date()).getTime();
+ long curTime;
+
+ while ((curTime = (new Date().getTime())) <
startTime
+ + RETRY_DELAY_MILLIS) {
+ /*
+ * Wait for incoming acknowledgements.
notify() is called in
+ * forget2PC(String).
+ */
+ wait(startTime - curTime +
RETRY_DELAY_MILLIS);
+
+ if (noParticipantsToTerminate())
+ break
waitForAllParticipantsToComplete;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
+ /*
+ * All particpants have acknowledged the final transaction
state. Notify
+ * all peers who have subscribed for the completion protocol
and forget
+ * about our litte transaction. ;-)
+ */
if (noParticipantsToTerminate()) {
-
+ /*
+ * TODO shouldn't this message also be acknowledged, at
least if
+ * there was an exception (e.g. timeout) caught?
+ */
Iterator iter = completionParticipants.iterator();
while (iter.hasNext())
try {
@@ -415,7 +555,6 @@
e.printStackTrace();
}
}
-
status = AT2PCStatus.NONE;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]