Hi Eugene,
many thanks for the deep analysis and patches. We will look at them in
the next weeks (quite busy right now).
We will also try to find a solution to fill the gaps which still remain
since my experience is that if something is "theoretically possible"
then it will occur for sure in real life ;)
Thanks
Michele
Eugene Wu wrote:
> Amendment on my fix:
> TopicHandler: moved destroyTimer check point from "private ArrayList
> timeout()" to "public final void timeout(Object userData)" in order to
> enable the immediate destruction when destroyDelay = 0.
>
> My approach still leaves some gap in order to avoid dead lock:
> After a publishing thread unset the destroyTimer and just began to do
> actual publishing, another publishing thread just finished its job and
> could reset the destroyTimer.
> In my testing, the chance didn't occur, but it's possible
> theoretically. So my approach only reduced the chance but not solved the
> problem completely.
>
>
> Eugene Wu wrote:
>
>> Hi, there:
>>
>> Figured out one cause for the issue. It could happen when a pulishing
>> thread and a timeout thread work on a same TopicHandler instance.
>>
>> For example, after thread A (publishing) retrieved a topicHandler from
>> topicHandlerMap in line 1695 of RoquestBroker:
>> Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
>>
>> the topicHandler's publish method would be invoked. As a synchronized
>> lock on the topicHandler can not be applyed on the whole pubish()
>> method (dead-lock concerns), another thread B (timeout) could get a
>> chance to invoke timeout(Object userData) method of the topicHandler
>> before thread A sets the topicHandler's state to ALIVE. Therefore, the
>> topicHandler could be publishing a message and, at the same time, also
>> could be destroyed, which caused unpredictable results.
>>
>> In my fix (attached), both publishing thread and timeout thread have a
>> check point, in which three things are done to let a slightly ahead
>> thread be able to prevent a slightly behind competing thread from
>> running:
>>
>> 1) Get the lock on the topicHandler (synchronized lock)
>> 2) Check if the topicHandler is working with a competing thread. If
>> yes, stop the current thread for timeout or stop using the handler for
>> publishing. Otherwise, goto step 3.
>> 3) Setup a check mark to prevent its competing thread running. The
>> mark must be cleared by the framework in a proper time later on.
>>
>> Here are more detailed scenarios:
>>
>> a) Publishing thread ahead
>> After thread A got the topicHandler, its state will be checked. If
>> it is dead (and should be destroyed now because of a synchronized
>> lock in timeout method), a new TopicHandler will be created. If the
>> retrieved topicHandle is not dead, its destroyTimer will be removed.
>> Later on, thread B will be stoped at the point where it check for the
>> destroyTimer's null value.
>>
>> The destroyTimer will be reset by the framework when the
>> topicHandler is turned to UNREFERENCED.
>>
>> b) Timeout thread ahead
>> After thread B got the topicHandler, it will check for the
>> existence of destroyTimer. If not existed, stop the timeout event.
>> Otherwise, the thread will continue. Because of a synchronized lock,
>> it will set the topicHandler as DEAD eventually, which will prevent
>> thread A from using the obsolete handler.
>>
>> A dead topicHandle will be garbage collected. (need to confirm this!)
>>
>>
>> Regards,
>> Eugene
>>
>
> ------------------------------------------------------------------------
>
> Index: org/xmlBlaster/engine/TopicHandler.java
> ===================================================================
> --- org/xmlBlaster/engine/TopicHandler.java (revision 15178)
> +++ org/xmlBlaster/engine/TopicHandler.java (working copy)
> @@ -1946,6 +1946,14 @@
> return notifyList;
> }
>
> + void unsetDestroyTimer() {
> + if (this.timerKey != null) {
> + log.finer("Unset destroyTimer for TopicHandler:
> "+this.getUniqueKey());
> + this.destroyTimer.removeTimeoutListener(this.timerKey);
> + this.timerKey = null;
> + }
> + }
> +
> /**
> * Merge the message DOM tree into the big xmlBlaster DOM tree
> */
> @@ -2196,8 +2204,16 @@
> * This timeout occurs after a configured delay (destroyDelay) in
> UNREFERENCED state
> */
> public final void timeout(Object userData) {
> - if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy
> delay occurred - destroying topic now ...");
> - ArrayList notifyList = timeout();
> + if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy
> delay occurred - destroying topic (oid="+getUniqueKey()+") now ...");
> + ArrayList notifyList = null;
> + synchronized (this) {
> + if (timerKey==null) {
> + log.finer("Ignored timeout event for the timer was unset by a
> publishing thread");
> + return;
> + }
> + notifyList = timeout();
> + }
> +
> if (notifyList != null) notifySubscribersAboutErase(notifyList); //
> must be outside the synchronize
> }
>
> Index: org/xmlBlaster/engine/RequestBroker.java
> ===================================================================
> --- org/xmlBlaster/engine/RequestBroker.java (revision 15178)
> +++ org/xmlBlaster/engine/RequestBroker.java (working copy)
> @@ -1686,6 +1686,7 @@
> // Handle local message
>
> // Find or create the topic
> + boolean newlycreated = false;
> TopicHandler topicHandler = null;
> synchronized(this.topicHandlerMap) {
> if (!msgKeyData.getOid().equals(msgUnit.getKeyOid())) {
> @@ -1695,11 +1696,25 @@
> Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
> if (obj == null) {
> topicHandler = new TopicHandler(this, sessionInfo,
> msgUnit.getKeyOid()); // adds itself to topicHandlerMap
> + newlycreated = true;
> }
> else {
> topicHandler = (TopicHandler)obj;
> }
> }
> +
> + if (!newlycreated && !topicHandler.isAlive()) {
> + log.finer("The topic is not ALIVE, need to check if it is dying
> or not");
> + synchronized(topicHandler) { // wiating if the topicHandler is
> dying.
> + if (topicHandler.isDead()) {
> + log.finer("The topic is DEAD, need to create a new one");
> + topicHandler = new TopicHandler(this, sessionInfo,
> msgUnit.getKeyOid());
> + }
> + else {
> + topicHandler.unsetDestroyTimer(); // to provent the
> topicHandler from being timed out
> + }
> + }
> + }
>
> // Process the message
> publishReturnQos = topicHandler.publish(sessionInfo, msgUnit,
> publishQos);