Hi, Michele,
Tryed another approach without affecting your current design. This
approach seems no gaps, but is not a pretty solution, as it exposes a
TopicHandler's thread counter to public and requires caller to invoke
three related methods in order.
How about using topic handler pooling and make each handler single
thread access?
Regards,
Eugene
Michele Laghi wrote:
Hi Eugene,
many thanks for the deep analysis and patches. We will look at them in
the next weeks (quite busy right now).
We will also try to find a solution to fill the gaps which still remain
since my experience is that if something is "theoretically possible"
then it will occur for sure in real life ;)
Thanks
Michele
Index: org/xmlBlaster/engine/TopicHandler.java
===================================================================
--- org/xmlBlaster/engine/TopicHandler.java (revision 15178)
+++ org/xmlBlaster/engine/TopicHandler.java (working copy)
@@ -142,7 +142,8 @@
private boolean isRegisteredInBigXmlDom = false;
- private int publishCounter = 0; //count the threads running in publish
method
+ private int publishCounter = 0; // count the threads running in publish
method (for internal use)
+ private int publishCounter2 = 0; // count the threads invoking publish
method (exposed to outside)
/**
* This topic is destroyed after given timeout
@@ -482,6 +483,8 @@
* <br />
* Publish filter plugin checks are done already<br />
* Cluster forwards are done already.
+ *
+ * @see incrementPublishCounter()
*
* @param publisherSessionInfo The publisher
* @param msgUnit The new message
@@ -1947,6 +1950,21 @@
}
/**
+ * Must call the method just befor calling publish(), and must guarantee
+ * decrementPublishCounter() be called after called publish().
+ */
+ public synchronized void incrementPublishCounter() {
+ publishCounter2 ++;
+ }
+
+ /**
+ * @see incrementPublishCounter()
+ */
+ public synchronized void decrementPublishCounter() {
+ publishCounter2 --;
+ }
+
+ /**
* Merge the message DOM tree into the big xmlBlaster DOM tree
*/
private void addToBigDom() throws XmlBlasterException {
@@ -2196,7 +2214,7 @@
* This timeout occurs after a configured delay (destroyDelay) in
UNREFERENCED state
*/
public final void timeout(Object userData) {
- if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay
occurred - destroying topic now ...");
+ if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay
occurred - destroying topic (oid="+getUniqueKey()+") now ...");
ArrayList notifyList = timeout();
if (notifyList != null) notifySubscribersAboutErase(notifyList); // must
be outside the synchronize
}
@@ -2207,6 +2225,10 @@
synchronized (this) {
if (isAlive()) // interim message arrived?
return null;
+ if (publishCounter2 != 0) {
+ log.finer("Ignored timeout event for publishing thread running");
+ return null;
+ }
return toDead(this.creatorSessionName, null, null);
}
}
Index: org/xmlBlaster/engine/RequestBroker.java
===================================================================
--- org/xmlBlaster/engine/RequestBroker.java (revision 15178)
+++ org/xmlBlaster/engine/RequestBroker.java (working copy)
@@ -1686,6 +1686,7 @@
// Handle local message
// Find or create the topic
+ boolean newlycreated = false;
TopicHandler topicHandler = null;
synchronized(this.topicHandlerMap) {
if (!msgKeyData.getOid().equals(msgUnit.getKeyOid())) {
@@ -1695,14 +1696,37 @@
Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
if (obj == null) {
topicHandler = new TopicHandler(this, sessionInfo,
msgUnit.getKeyOid()); // adds itself to topicHandlerMap
+ newlycreated = true;
}
else {
topicHandler = (TopicHandler)obj;
}
}
+
+ boolean counterIncreased = false;
+ if (!newlycreated && !topicHandler.isAlive()) {
+ log.finer("The topic is not ALIVE, need to check if it is dying or
not");
+ synchronized(topicHandler) { // wiating if the topicHandler is
dying.
+ if (topicHandler.isDead()) {
+ log.finer("The topic is DEAD, need to create a new one");
+ topicHandler = new TopicHandler(this, sessionInfo,
msgUnit.getKeyOid());
+ }
+ else {
+ topicHandler.incrementPublishCounter(); // to provent the
topicHandler from being timed out
+ counterIncreased = true; // do NOT insert any code that
might throws any exception between this line and the line of
topicHandler.publish() below.
+ }
+ }
+ }
// Process the message
- publishReturnQos = topicHandler.publish(sessionInfo, msgUnit,
publishQos);
+ try {
+ publishReturnQos = topicHandler.publish(sessionInfo, msgUnit,
publishQos);
+ }
+ finally {
+ if (counterIncreased) {
+ topicHandler.decrementPublishCounter(); // balance counter
+ }
+ }
if (publishReturnQos == null) { // assert only
StatusQosData qos = new StatusQosData(glob, MethodName.PUBLISH);