Eugene,

thanks for this patch, i have applied it to the current xmlBlaster svn.

It is a brilliant analysis and an even more smart solution approach.

After code inspection i think there is no gap in your approach.

regards
Marcel

Eugene Wu wrote:

Marcel,

I fixed a multithread racing issue which caused message losing. The patch is attached with the mail.

It was concerned with org.xmlBlaster.engine.TopicHandler.publish() and entryDestoyed(). Let's assume thread A just called toAlive() in line 537, which set topic state to ALIVE. Before thread A pushs a msgUnit into callback queue(s) through TopicHandler.invokeCallbackAndHandleFailure() in line 645 --> invokeCallback() in line 1337, another thread B just finished its publish in the same topic and might triggered a message distroying, which, in turn, might switch topic state to UNREFERENCED. When thread A reached TopicHandler.invokeCallback() in line 1321, it would find the topic was in invalid state and droped the message.

I used a publishCounter to record the number of running threads in the publish method. When entryDetryed() intents to change topic state to UNREFERENCED, it will first check the publishCounter. If it is greater than one, don't change topic state.

I just dealed with one scenario in one possible way without looking at the whole picture.

Eugene

------------------------------------------------------------------------

--- xmlBlaster_1.1.1_orig/src/java/org/xmlBlaster/engine/TopicHandler.java      
2006-01-03 19:01:41.000000000 +0000
+++ xmlBlaster_1.1.1_my/src/java/org/xmlBlaster/engine/TopicHandler.java        
2006-04-12 15:16:53.000000000 +0000
@@ -139,6 +139,8 @@

   private boolean isRegisteredInBigXmlDom = false;

+   private int publishCounter = 0; //count the threads running in publish 
method
+ /**
    * This topic is destroyed after given timeout
    * The timer is activated on state change to UNREFERENCED
@@ -566,7 +568,12 @@
            }

            msgUnitWrapper = new MsgUnitWrapper(glob, msgUnit, 
this.msgUnitCache, initialCounter, 0, -1);
-
+ + publishCounter++;
+            if (!isAlive()) {
+                toAlive();
+            }
+ // Forcing RAM entry temporary (reset in finally below) to avoid performance critical harddisk IO during initialization, every callback/subject/history queue put()/take() is changing the reference counter of MsgUnitWrapper. For persistent messages this needs to be written to harddisk
            // If the server crashed during this RAM operation it is not 
critical as the publisher didn't get an ACK yet
            synchronized(this.msgUnitWrapperUnderConstruction) {
@@ -649,6 +656,7 @@
      finally {
         if (msgUnitWrapper != null) {
            synchronized(this) {
+               publishCounter--;
               synchronized(msgUnitWrapper) {
                  synchronized(this.msgUnitCache) {
                     try {
@@ -941,9 +949,12 @@
               if (isSoftErased()) {
                  notifyList = toDead(this.creatorSessionName, false);
               }
-               else {
+               else if (publishCounter==0) {
                  notifyList = toUnreferenced(false);
               }
+               else {
+                  if (log.TRACE) log.trace(ME, "ignored the attempt to set topic 
unreferenced as other thread in publish");
+               }
            }
            catch (XmlBlasterException e) {
               log.error(ME, "Internal problem with entryDestroyed: " + e.getMessage() + 
": " + toXml());

Reply via email to