Author: ruwan
Date: Mon Nov 26 03:02:10 2007
New Revision: 598222

URL: http://svn.apache.org/viewvc?rev=598222&view=rev
Log:
Fine tuning the caching to work on clustered env with performance improvements

Modified:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java?rev=598222&r1=598221&r2=598222&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
 Mon Nov 26 03:02:10 2007
@@ -46,6 +46,7 @@
     private static final QName ATT_ID = new QName("id");
     private static final QName ATT_COLLECTOR = new QName("collector");
     private static final QName ATT_HASH_GENERATOR = new QName("hashGenerator");
+    private static final QName ATT_MAX_MSG_SIZE = new QName("maxMessageSize");
     private static final QName ATT_TIMEOUT = new QName("timeout");
     private static final QName ATT_SCOPE = new QName("scope");
     private static final QName ATT_SEQUENCE = new QName("sequence");
@@ -74,7 +75,7 @@
 
         OMAttribute scopeAttr = elem.getAttribute(ATT_SCOPE);
         if (scopeAttr != null && scopeAttr.getAttributeValue() != null &&
-            isValidScope(scopeAttr.getAttributeValue())) {
+            isValidScope(scopeAttr.getAttributeValue(), cache.getId())) {
             cache.setScope(scopeAttr.getAttributeValue());
         } else {
             cache.setScope(CachingConstants.SCOPE_PER_HOST);
@@ -117,6 +118,11 @@
                 cache.setTimeout(DEFAULT_TIMEOUT);
             }
 
+            OMAttribute maxMessageSizeAttr = 
elem.getAttribute(ATT_MAX_MSG_SIZE);
+            if (maxMessageSizeAttr != null && 
maxMessageSizeAttr.getAttributeValue() != null) {
+                
cache.setMaxMessageSize(Integer.parseInt(maxMessageSizeAttr.getAttributeValue()));
+            }
+
             OMElement onCacheHitElem = 
elem.getFirstChildWithName(ON_CACHE_HIT_Q);
             if (onCacheHitElem != null) {
                 OMAttribute sequenceAttr = 
onCacheHitElem.getAttribute(ATT_SEQUENCE);
@@ -154,11 +160,16 @@
         return cache;
     }
 
-    private boolean isValidScope(String scope) {
+    private boolean isValidScope(String scope, String id) {
         if (CachingConstants.SCOPE_PER_HOST.equals(scope)) {
             return true;
         } else if (CachingConstants.SCOPE_PER_MEDIATOR.equals(scope)) {
-            return true;
+            if (id != null) {
+                return true;
+            } else {
+                handleException("Id is required for a cache wirth scope : " + 
scope);
+                return false;
+            }
         } else if (CachingConstants.SCOPE_DISTRIBUTED.equals(scope)) {
             handleException("Scope distributed is not supported yet by the 
Cache mediator");
             return false;

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java?rev=598222&r1=598221&r2=598222&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
 Mon Nov 26 03:02:10 2007
@@ -69,6 +69,11 @@
                     fac.createOMAttribute("timeout", nullNS, 
Long.toString(mediator.getTimeout())));
             }
 
+            if (mediator.getMaxMessageSize() != 0) {
+                cache.addAttribute(
+                    fac.createOMAttribute("maxMessageSize", nullNS, 
Integer.toString(mediator.getMaxMessageSize())));
+            }
+
             if (mediator.getOnCacheHitRef() != null) {
                 OMElement onCacheHit = fac.createOMElement("onCacheHit", 
synNS);
                 onCacheHit.addAttribute(

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=598222&r1=598221&r2=598222&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 Mon Nov 26 03:02:10 2007
@@ -25,10 +25,12 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.saaj.util.SAAJUtil;
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
 import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.core.axis2.Axis2Sender;
 import org.apache.synapse.mediators.AbstractMediator;
 import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.util.FixedByteArrayOutputStream;
 import org.apache.synapse.util.MessageHelper;
 import org.wso2.caching.Cache;
 import org.wso2.caching.CachedObject;
@@ -60,6 +62,7 @@
     private long timeout = 0L;
     private SequenceMediator onCacheHitSequence = null;
     private String onCacheHitRef = null;
+    private int maxMessageSize = 0;
     private String cacheObjKey = CachingConstants.CACHE_OBJECT; // default 
per-host
     private static final String CACHE_OBJ_PREFIX = "synapse.cache_obj_";
 
@@ -77,6 +80,23 @@
             }
         }
 
+        // if maxMessageSize is specified check for the message size before 
processing
+        FixedByteArrayOutputStream fbaos = null;
+        if (maxMessageSize > 0) {
+            fbaos = new FixedByteArrayOutputStream(maxMessageSize);
+            try {
+                
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(fbaos);
+            } catch (XMLStreamException e) {
+                handleException("Error in checking the message size", e, 
synCtx);
+            } catch (SynapseException syne) {
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, "Message size exceeds the upper 
bound for caching, " +
+                            "request will not be cached");
+                    return true;
+                }
+            }
+        }
+
         ConfigurationContext cfgCtx =
             ((Axis2MessageContext) 
synCtx).getAxis2MessageContext().getConfigurationContext();
         if (cfgCtx == null) {
@@ -91,7 +111,7 @@
         }
 
         // look up cache
-        Object prop = cfgCtx.getProperty(cacheObjKey);
+        Object prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
         Cache cache;
         if (prop != null && prop instanceof Cache) {
             cache = (Cache) prop;
@@ -99,7 +119,7 @@
         } else {
             synchronized (cfgCtx) {
                 // check again after taking the lock to make sure no one else 
did it before us
-                prop = cfgCtx.getProperty(cacheObjKey);
+                prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
                 if (prop != null && prop instanceof Cache) {
                     cache = (Cache) prop;
 
@@ -115,10 +135,10 @@
 
         boolean result = true;
         if (synCtx.isResponse()) {
-            processResponseMessage(synCtx, traceOrDebugOn, traceOn, cache);
+            processResponseMessage(synCtx, cfgCtx, traceOrDebugOn, traceOn, 
cache);
 
         } else {
-            result = processRequestMessage(synCtx, traceOrDebugOn, traceOn, 
cache);
+            result = processRequestMessage(synCtx, cfgCtx, traceOrDebugOn, 
traceOn, cache, fbaos);
         }
 
         try {
@@ -142,9 +162,10 @@
      * @param traceOrDebugOn is trace or debug logging on?
      * @param traceOn        is tracing on?
      * @param synCtx         the current message (response)
+     * @param cfgCtx         the abstract context in which the cache will be 
kept
      * @param cache          the cache
      */
-    private void processResponseMessage(MessageContext synCtx, boolean 
traceOrDebugOn,
+    private void processResponseMessage(MessageContext synCtx, 
ConfigurationContext cfgCtx, boolean traceOrDebugOn,
         boolean traceOn, Cache cache) {
 
         if (!collector) {
@@ -178,15 +199,17 @@
                     handleException("Unable to set the response to the Cache", 
e, synCtx);
                 }
 
-                // this is not required yet can commented this for perf 
improvements
-                // in the future there can be a situation where user sends the 
request with the
-                // response hash (if client side caching is on) in which case 
we can compare that
-                // response hash with the given response hash and respond with 
not-modified http header
-                cachedObj.setResponseHash(cache.getGenerator().getDigest(
-                    ((Axis2MessageContext) synCtx).getAxis2MessageContext()));
+                /* this is not required yet, can commented this for perf 
improvements
+                   in the future there can be a situation where user sends the 
request with the
+                   response hash (if client side caching is on) in which case 
we can compare that
+                   response hash with the given response hash and respond with 
not-modified http header */
+                // cachedObj.setResponseHash(cache.getGenerator().getDigest(
+                //     ((Axis2MessageContext) 
synCtx).getAxis2MessageContext()));
 
                 cachedObj.setExpireTime(System.currentTimeMillis() + 
cachedObj.getTimeout());
 
+                cfgCtx.setProperty(cacheObjKey, cache);
+
             } else {
                 auditWarn("A response message without a valid mapping to the " 
+
                     "request hash found. Unable to store the response in 
cache", synCtx);
@@ -204,13 +227,15 @@
      * this message as a response and sends back directly to client.
      *
      * @param synCtx         incoming request message
+     * @param cfgCtx         the AbstractContext in which the cache will be 
kept
      * @param traceOrDebugOn is tracing or debug logging on?
      * @param traceOn        is tracing on?
      * @param cache          the cache
+     * @param fbaos          the serialized request envelope
      * @return should this mediator terminate further processing?
      */
-    private boolean processRequestMessage(MessageContext synCtx, boolean 
traceOrDebugOn,
-        boolean traceOn, Cache cache) {
+    private boolean processRequestMessage(MessageContext synCtx, 
ConfigurationContext cfgCtx, boolean traceOrDebugOn,
+        boolean traceOn, Cache cache, FixedByteArrayOutputStream fbaos) {
 
         if (collector) {
             handleException("Request messages cannot be handled in a collector 
cache", synCtx);
@@ -296,6 +321,8 @@
                     traceOrDebug(traceOn,
                         "Existing cached response has expired. Reset cache 
element");
                 }
+
+                cfgCtx.setProperty(cacheObjKey, cache);
             }
 
         } else {
@@ -308,10 +335,10 @@
                         traceOrDebug(traceOn, "In-memory cache is full. Unable 
to cache");
                     }
                 } else {
-                    storeRequestToCache(synCtx, requestHash, cache);
+                    storeRequestToCache(synCtx, cfgCtx, requestHash, cache, 
fbaos);
                 }
             } else {
-                storeRequestToCache(synCtx, requestHash, cache);
+                storeRequestToCache(synCtx, cfgCtx, requestHash, cache, fbaos);
             }
         }
         return true;
@@ -321,21 +348,33 @@
      * Store request message to the cache
      *
      * @param synCtx      the request message
+     * @param cfgCtx      the Abstract context in which the cache will be kept
      * @param requestHash the request hash that has already been computed
      * @param cache       the cache
+     * @param fbaos       the serialized request envelope
      */
-    private void storeRequestToCache(MessageContext synCtx, String 
requestHash, Cache cache) {
+    private void storeRequestToCache(MessageContext synCtx, 
ConfigurationContext cfgCtx, String requestHash, Cache cache,
+        FixedByteArrayOutputStream fbaos) {
+        
         CachedObject cachedObj = new CachedObject();
-        ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
-        try {
-            
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
-            cachedObj.setRequestEnvelope(requestStream.toByteArray());
-        } catch (XMLStreamException e) {
-            handleException("Unable to store the request in to the cache", e, 
synCtx);
+        if (fbaos != null) {
+            cachedObj.setRequestEnvelope(fbaos.toByteArray());
+        } else {
+            // this else block can be commented out for the perf improvements, 
because we are not using
+            // this for the moment
+            ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
+            try {
+                
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
+                cachedObj.setRequestEnvelope(requestStream.toByteArray());
+            } catch (XMLStreamException e) {
+                handleException("Unable to store the request in to the cache", 
e, synCtx);
+            }
         }
         cachedObj.setRequestHash(requestHash);
         cachedObj.setTimeout(timeout);
         cache.addResponseWithKey(requestHash, cachedObj);
+
+        cfgCtx.setProperty(cacheObjKey, cache);
     }
 
     public String getId() {
@@ -344,7 +383,6 @@
 
     public void setId(String id) {
         this.id = id;
-        this.cacheObjKey = CACHE_OBJ_PREFIX + id;
     }
 
     public String getScope() {
@@ -353,6 +391,9 @@
 
     public void setScope(String scope) {
         this.scope = scope;
+        if (CachingConstants.SCOPE_PER_MEDIATOR.equals(scope)) {
+            cacheObjKey = CACHE_OBJ_PREFIX + id;
+        }
     }
 
     public boolean isCollector() {
@@ -409,5 +450,13 @@
 
     public void setOnCacheHitRef(String onCacheHitRef) {
         this.onCacheHitRef = onCacheHitRef;
+    }
+
+    public int getMaxMessageSize() {
+        return maxMessageSize;
+    }
+
+    public void setMaxMessageSize(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to