Author: ruwan
Date: Tue Nov 13 04:53:35 2007
New Revision: 594519

URL: http://svn.apache.org/viewvc?rev=594519&view=rev
Log:
Fixed caching to work in the clustering environment

Modified:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=594519&r1=594518&r2=594519&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 Tue Nov 13 04:53:35 2007
@@ -20,8 +20,10 @@
 package org.apache.synapse.mediators.builtin;
 
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.axis2.description.Parameter;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.context.Replicator;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.saaj.util.SAAJUtil;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.core.axis2.Axis2Sender;
@@ -33,6 +35,15 @@
 import org.wso2.caching.CachingConstants;
 import org.wso2.caching.digest.DigestGenerator;
 
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.MimeHeaders;
+import javax.xml.soap.SOAPException;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.stream.XMLStreamException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
 /**
  *
  */
@@ -65,41 +76,38 @@
             }
         }
 
-        AxisConfiguration axisCfg = 
synCtx.getConfiguration().getAxisConfiguration();
-        if (axisCfg == null) {
-            handleException("Unable to perform caching, "
-                + " AxisConfiguration cannot be found", synCtx);
+        ConfigurationContext cfgCtx =
+            ((Axis2MessageContext) 
synCtx).getAxis2MessageContext().getConfigurationContext();
+        if (cfgCtx == null) {
+            handleException("Unable to perform caching, " + " 
ConfigurationContext cannot be found",
+                synCtx);
             return false; // never executes.. but keeps IDE happy
         }
 
         if (traceOrDebugOn) {
-            traceOrDebug(traceOn, "Looking up cache at scope : " +
-                scope + " with ID : " + cacheObjKey);
+            traceOrDebug(traceOn,
+                "Looking up cache at scope : " + scope + " with ID : " + 
cacheObjKey);
         }
 
         // look up cache
-        Parameter param = axisCfg.getParameter(cacheObjKey);
-        Cache cache = null;
-        if (param != null && param.getValue() instanceof Cache) {
-            cache = (Cache) param.getValue();
+        Object prop = cfgCtx.getProperty(cacheObjKey);
+        Cache cache;
+        if (prop != null && prop instanceof Cache) {
+            cache = (Cache) prop;
 
         } else {
-            synchronized (axisCfg) {
+            synchronized (cfgCtx) {
                 // check again after taking the lock to make sure no one else 
did it before us
-                param = axisCfg.getParameter(cacheObjKey);
-                if (param != null && param.getValue() instanceof Cache) {
-                    cache = (Cache) param.getValue();
+                prop = cfgCtx.getProperty(cacheObjKey);
+                if (prop != null && prop instanceof Cache) {
+                    cache = (Cache) prop;
 
                 } else {
                     if (traceOrDebugOn) {
                         traceOrDebug(traceOn, "Creating/recreating the cache 
object");
                     }
                     cache = new Cache();
-                    try {
-                        axisCfg.addParameter(cacheObjKey, cache);
-                    } catch (AxisFault af) {
-                        auditWarn("Unable to create a cache with ID : " + 
cacheObjKey, synCtx);
-                    }
+                    cfgCtx.setProperty(cacheObjKey, cache);
                 }
             }
         }
@@ -112,6 +120,14 @@
             result = processRequestMessage(synCtx, traceOrDebugOn, traceOn, 
cache);
         }
 
+        try {
+            Replicator.replicate(cfgCtx);
+        } catch (ClusteringFault clusteringFault) {
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, "Unable to replicate Cache mediator 
state among the cluster");
+            }
+        }
+
         if (traceOrDebugOn) {
             traceOrDebug(traceOn, "End : Cache mediator");
         }
@@ -121,20 +137,21 @@
     /**
      * Process a response message through this cache mediator. This finds the 
Cache used, and
      * updates it for the corresponding request hash
+     *
      * @param traceOrDebugOn is trace or debug logging on?
-     * @param traceOn is tracing on?
-     * @param synCtx the current message (response)
-     * @param cache the cache
+     * @param traceOn        is tracing on?
+     * @param synCtx         the current message (response)
+     * @param cache          the cache
      */
     private void processResponseMessage(boolean traceOrDebugOn, boolean 
traceOn,
         MessageContext synCtx, Cache cache) {
 
-        Object requestHash = 
synCtx.getProperty(CachingConstants.REQUEST_HASH_KEY);
+        String requestHash = (String) 
synCtx.getProperty(CachingConstants.REQUEST_HASH_KEY);
 
         if (requestHash != null) {
             if (traceOrDebugOn) {
-                traceOrDebug(traceOn, "Storing the response message into the 
cache at scope : "
-                    + scope + " with ID : " + cacheObjKey + " for request hash 
: " + requestHash);
+                traceOrDebug(traceOn, "Storing the response message into the 
cache at scope : " +
+                    scope + " with ID : " + cacheObjKey + " for request hash : 
" + requestHash);
             }
 
             Object obj = cache.getResponseForKey(requestHash);
@@ -143,13 +160,18 @@
 
                 CachedObject cachedObj = (CachedObject) obj;
                 if (traceOrDebugOn) {
-                    traceOrDebug(traceOn, "Storing the response for the 
message with ID : "
-                        + synCtx.getMessageID() + " with request hash ID : " +
+                    traceOrDebug(traceOn, "Storing the response for the 
message with ID : " +
+                        synCtx.getMessageID() + " with request hash ID : " +
                         cachedObj.getRequestHash() + " in the cache : " + 
cacheObjKey);
                 }
 
-                cachedObj.setResponseEnvelope(
-                    MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()));
+                ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+                try {
+                    
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(outStream);
+                    cachedObj.setResponseEnvelope(outStream.toByteArray());
+                } catch (XMLStreamException e) {
+                    handleException("Unable to set the response to the Cache", 
e, synCtx);
+                }
 
                 // this is not required yet can commented this for perf 
improvements
                 // in the future there can be a situation where user sends the 
request with the
@@ -158,8 +180,7 @@
                 cachedObj.setResponseHash(digestGenerator.getDigest(
                     ((Axis2MessageContext) synCtx).getAxis2MessageContext()));
 
-                cachedObj.setExpireTime(
-                    System.currentTimeMillis() + cachedObj.getTimeout());
+                cachedObj.setExpireTime(System.currentTimeMillis() + 
cachedObj.getTimeout());
 
             } else {
                 auditWarn("A response message without a valid mapping to the " 
+
@@ -173,18 +194,20 @@
     }
 
     /**
-     * Processes a request message through the cache mediator. Generates the 
request hash and
-     * looks up for a hit, if found; then the specified named or anonymous 
sequence is executed
-     * or marks this message as a response and sends back directly to client.
-     * @param synCtx incoming request message
+     * Processes a request message through the cache mediator. Generates the 
request hash and looks
+     * up for a hit, if found; then the specified named or anonymous sequence 
is executed or marks
+     * this message as a response and sends back directly to client.
+     *
+     * @param synCtx         incoming request message
      * @param traceOrDebugOn is tracing or debug logging on?
-     * @param traceOn is tracing on?
-     * @param cache the cache
+     * @param traceOn        is tracing on?
+     * @param cache          the cache
      * @return should this mediator terminate further processing?
      */
-    private boolean processRequestMessage(MessageContext synCtx, boolean 
traceOrDebugOn, boolean traceOn, Cache cache) {
+    private boolean processRequestMessage(MessageContext synCtx, boolean 
traceOrDebugOn,
+        boolean traceOn, Cache cache) {
 
-        Object requestHash = digestGenerator
+        String requestHash = digestGenerator
             .getDigest(((Axis2MessageContext) 
synCtx).getAxis2MessageContext());
         synCtx.setProperty(CachingConstants.REQUEST_HASH_KEY, requestHash);
 
@@ -202,25 +225,37 @@
             if (!cachedObj.isExpired() && cachedObj.getResponseEnvelope() != 
null) {
 
                 if (traceOrDebugOn) {
-                    traceOrDebug(traceOn,
-                        "Cache-hit for message ID : " + synCtx.getMessageID());
+                    traceOrDebug(traceOn, "Cache-hit for message ID : " + 
synCtx.getMessageID());
                 }
 
                 // mark as a response and replace envelope from cache
                 synCtx.setResponse(true);
                 try {
-                    synCtx.setEnvelope(cachedObj.getResponseEnvelope());
+                    MessageFactory mf = MessageFactory.newInstance();
+                    SOAPMessage smsg = mf.createMessage(new MimeHeaders(),
+                        new 
ByteArrayInputStream(cachedObj.getResponseEnvelope()));
+
+                    org.apache.axiom.soap.SOAPEnvelope omSOAPEnv =
+                        
SAAJUtil.toOMSOAPEnvelope(smsg.getSOAPPart().getDocumentElement());
+
+                    synCtx.setEnvelope(omSOAPEnv);
                 } catch (AxisFault axisFault) {
-                    handleException(
-                        "Error setting response envelope from cache : " + 
cacheObjKey, synCtx);
+                    handleException("Error setting response envelope from 
cache : " + cacheObjKey,
+                        synCtx);
+                } catch (IOException ioe) {
+                    handleException("Error setting response envelope from 
cache : " + cacheObjKey,
+                        ioe, synCtx);
+                } catch (SOAPException soape) {
+                    handleException("Error setting response envelope from 
cache : " + cacheObjKey,
+                        soape, synCtx);
                 }
 
                 // take specified action on cache hit
                 if (onCacheHitSequence != null) {
                     // if there is an onCacheHit use that for the mediation
                     if (traceOrDebugOn) {
-                        traceOrDebug(traceOn, "Delegating message to the 
onCachingHit " +
-                            "Anonymous sequence");
+                        traceOrDebug(traceOn,
+                            "Delegating message to the onCachingHit " + 
"Anonymous sequence");
                     }
                     onCacheHitSequence.mediate(synCtx);
 
@@ -249,7 +284,8 @@
                 // cache exists, but has expired...
                 cachedObj.clearCache();
                 if (traceOrDebugOn) {
-                    traceOrDebug(traceOn, "Existing cached response has 
expired. Reset cache element");
+                    traceOrDebug(traceOn,
+                        "Existing cached response has expired. Reset cache 
element");
                 }
             }
 
@@ -274,13 +310,20 @@
 
     /**
      * Store request message to the cache
-     * @param synCtx the request message
+     *
+     * @param synCtx      the request message
      * @param requestHash the request hash that has already been computed
-     * @param cache the cache
+     * @param cache       the cache
      */
-    private void storeRequestToCache(MessageContext synCtx, Object 
requestHash, Cache cache) {
+    private void storeRequestToCache(MessageContext synCtx, String 
requestHash, Cache cache) {
         CachedObject cachedObj = new CachedObject();
-        
cachedObj.setRequestEnvelope(MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()));
+        ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
+        try {
+            
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
+            cachedObj.setRequestEnvelope(requestStream.toByteArray());
+        } catch (XMLStreamException e) {
+            handleException("Unable to store the request in to the cache", e, 
synCtx);
+        }
         cachedObj.setRequestHash(requestHash);
         cachedObj.setTimeout(timeout);
         cache.addResponseWithKey(requestHash, cachedObj);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to