squids ICAP adaptation implementation does not check adapted body-pipes buffer 
size before reading from an ICAP-server.

If the client does not read from the open connection (i.e. the user does not
confirm the browsers download-message-box in microsofts IE), squid keeps on
reading data from the ICAP server into the body pipe, whilst no more data
can be delivered to the client. 
Thus the adapted body pipes buffer is growing unlimited which causes excessive 
RAM growth.

this issue was reported as bugzilla bug 2619 and the attached patch (applies 
to 3.1 and 3.HEAD) should fix the issue.

please review the patch and feel free to comment it.

-----------

i would like to subscribe the squid-dev  mailing list.
my name is martin huter. i'm working as progammer at phion AG in austria.
i'm mainly working with C and C++ and among others i am responsible for our 
squid service. currently i'm updating squid to version 3.1.

p.s.: many thanks to alex rousskov for his patient help on the appended patch.

cheers
martin huter


-- 
Martin Huter
Unit Manager
Release Manager
phion AG
Eduard-Bodem-Gasse 1
A-6020 Innsbruck

Tel:     +43 (0) 508 100
Fax:     +43 (0) 508 100 20
Mail:    [email protected]
Web:     http://www.phion.com

phion AG
Vorsitzender des Aufsichtsrates: Dr. Karl  Lamprecht
Vorstand: Dr. Wieland Alge, Mag. Günter Klausner
Sitz der Gesellschaft: 6020 Innsbruck, Österreich
Handelsgericht Innsbruck Firmenbuch: 184392s
UID-Nr:: ATU47509003
diff -NaurbB squid-3.1.0.13-20090815.orig/src/Server.cc squid-3.1.0.13-20090815.new/src/Server.cc
--- squid-3.1.0.13-20090815.orig/src/Server.cc	2009-08-18 08:38:44.000000000 +0200
+++ squid-3.1.0.13-20090815.new/src/Server.cc	2009-08-18 09:24:07.000000000 +0200
@@ -44,6 +44,7 @@
 #if USE_ADAPTATION
 #include "adaptation/AccessCheck.h"
 #include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
 #endif
 
 // implemented in client_side_reply.cc until sides have a common parent
@@ -56,6 +57,8 @@
         , adaptationAccessCheckPending(false)
         , startedAdaptation(false)
 #endif
+         , theVirginReply(NULL)
+         , theFinalReply(NULL) 
 {
     fwd = theFwdState;
     entry = fwd->entry;
@@ -274,7 +277,9 @@
         handleMoreAdaptedBodyAvailable();
         return;
     }
+    else
 #endif
+    if(requestBodySource == bp)
     handleMoreRequestBodyAvailable();
 }
 
@@ -287,7 +292,9 @@
         handleAdaptedBodyProductionEnded();
         return;
     }
+    else
 #endif
+    if(requestBodySource == bp)
     handleRequestBodyProductionEnded();
 }
 
@@ -300,7 +307,9 @@
         handleAdaptedBodyProducerAborted();
         return;
     }
+    else
 #endif
+    if(requestBodySource == bp)
     handleRequestBodyProducerAborted();
 }
 
@@ -662,13 +671,56 @@
     handleAdaptationAborted(!final);
 }
 
+void
+ServerStateData::handleKickProducerReads()
+{
+    if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
+        abortTransaction("store entry aborted while kick producer callback");
+        return;
+    }
+    else if(!entry->isAccepting()){
+        return;
+    }
+    if(!adaptedBodySource){
+        return;
+    }
+    handleMoreAdaptedBodyAvailable();
+}
+
+
 // more adapted response body is available
 void
 ServerStateData::handleMoreAdaptedBodyAvailable()
 {
-    const size_t contentSize = adaptedBodySource->buf().contentSize();
+    size_t contentSize = adaptedBodySource->buf().contentSize();
+    bool consumedPartially = false;
 
-    debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
+    if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
+        abortTransaction("store entry aborted while reading adapted body");
+        return;
+     }
+ 
+     if(!contentSize) return;
+ 
+    const size_t contentSizeWanted=entry->bytesWanted(Range<size_t>(0,contentSize));
+     if(!contentSizeWanted && contentSize>1){
+         debugs(11,5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+             "response body at offset " << adaptedBodySource->consumedSize() << ". Not enough buffer avail: " << contentSizeWanted);
+ 
+         typedef NullaryMemFunT<ServerStateData> Dialer;
+         AsyncCall::Pointer call = asyncCall(93,3, "ServerStateData::handleMoreAdaptedBodyAvailableCB",
+                                   Dialer(this, &ServerStateData::handleKickProducerReads));
+         entry->setDeferredProducerCall(call);
+         return;
+     }
+     else if(contentSizeWanted+1<contentSize ){
+         debugs(11,5, HERE << "storing " << contentSizeWanted << " instead of " << contentSize << " bytes of adapted " <<
+                 "response body at offset " << adaptedBodySource->consumedSize() << " due to MemObject buffer size");
+         contentSize=contentSizeWanted+1;
+         consumedPartially=true;;
+     }
+ 
+    debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
            "response body at offset " << adaptedBodySource->consumedSize());
 
     if (abortOnBadEntry("entry refuses adapted body"))
@@ -676,17 +728,26 @@
 
     assert(entry);
     BodyPipeCheckout bpc(*adaptedBodySource);
-    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
-    currentOffset += bpc.buf.size;
+    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+    //currentOffset += bpc.buf.size;
+    currentOffset += contentSize;
     entry->write(ioBuf);
     bpc.buf.consume(contentSize);
     bpc.checkIn();
+
+     if(!consumedPartially && (adaptedBodySource!=NULL) && adaptedBodySource->productionEnded())
+         handleAdaptedBodyProductionEnded();
 }
 
 // the entire adapted response body was produced, successfully
 void
 ServerStateData::handleAdaptedBodyProductionEnded()
 {
+     if(adaptedBodySource->buf().contentSize()){
+         // not yet consumed everything. postpone ending consumption until we've consumed everything
+         return;
+     }
+ 
     stopConsumingFrom(adaptedBodySource);
 
     if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
diff -NaurbB squid-3.1.0.13-20090815.orig/src/Server.h squid-3.1.0.13-20090815.new/src/Server.h
--- squid-3.1.0.13-20090815.orig/src/Server.h	2009-08-18 08:38:44.000000000 +0200
+++ squid-3.1.0.13-20090815.new/src/Server.h	2009-08-18 09:21:09.000000000 +0200
@@ -153,6 +153,8 @@
 
     void handleAdaptationCompleted();
     void handleAdaptationAborted(bool bypassable = false);
+
+	void handleKickProducerReads();
 #endif
 
 protected:
diff -NaurbB squid-3.1.0.13-20090815.orig/src/store.cc squid-3.1.0.13-20090815.new/src/store.cc
--- squid-3.1.0.13-20090815.orig/src/store.cc	2009-08-18 08:38:44.000000000 +0200
+++ squid-3.1.0.13-20090815.new/src/store.cc	2009-08-18 09:21:47.000000000 +0200
@@ -349,6 +349,10 @@
     debugs(20, 3, HERE << "new StoreEntry " << this);
     mem_obj = NULL;
 
+#if USE_ADAPTATION
+	deferredProducerCall=NULL;
+#endif
+
     expires = lastmod = lastref = timestamp = -1;
 
     swap_filen = -1;
@@ -366,6 +370,21 @@
     swap_dirn = -1;
 }
 
+#if USE_ADAPTATION
+void StoreEntry::setDeferredProducerCall(AsyncCall::Pointer call)
+{
+    deferredProducerCall=call;
+}
+
+void StoreEntry::kickProducerReads()
+{
+	if(deferredProducerCall!=NULL){
+		ScheduleCallHere(deferredProducerCall);
+		deferredProducerCall=NULL;
+	}
+}
+#endif
+
 void
 StoreEntry::destroyMemObject()
 {
diff -NaurbB squid-3.1.0.13-20090815.orig/src/store_client.cc squid-3.1.0.13-20090815.new/src/store_client.cc
--- squid-3.1.0.13-20090815.orig/src/store_client.cc	2009-08-18 08:38:44.000000000 +0200
+++ squid-3.1.0.13-20090815.new/src/store_client.cc	2009-08-18 09:21:09.000000000 +0200
@@ -262,6 +262,10 @@
     copying = false;
 
     storeClientCopy2(entry, this);
+
+#if USE_ADAPTATION
+	if(entry) entry->kickProducerReads();
+#endif
 }
 
 /*
@@ -694,6 +698,10 @@
     else
         mem->kickReads();
 
+#if USE_ADAPTATION
+	if(e) e->kickProducerReads();
+#endif
+
     return 1;
 }
 
diff -NaurbB squid-3.1.0.13-20090815.orig/src/Store.h squid-3.1.0.13-20090815.new/src/Store.h
--- squid-3.1.0.13-20090815.orig/src/Store.h	2009-08-18 08:38:44.000000000 +0200
+++ squid-3.1.0.13-20090815.new/src/Store.h	2009-08-18 09:21:09.000000000 +0200
@@ -185,6 +185,14 @@
     virtual void lock();
     virtual void release();
 
+#if USE_ADAPTATION
+	void setDeferredProducerCall(AsyncCall::Pointer);
+	void kickProducerReads();
+	bool isDeferredProducerCallSet(){return deferredProducerCall!=NULL; }
+protected:
+	AsyncCall::Pointer deferredProducerCall;
+#endif
+
 private:
     static MemAllocator *pool;
 
diff -NaurbB squid-3.1.0.13-20090815.orig/src/StoreIOBuffer.h squid-3.1.0.13-20090815.new/src/StoreIOBuffer.h
--- squid-3.1.0.13-20090815.orig/src/StoreIOBuffer.h	2009-08-18 08:38:44.000000000 +0200
+++ squid-3.1.0.13-20090815.new/src/StoreIOBuffer.h	2009-08-18 09:21:09.000000000 +0200
@@ -59,6 +59,13 @@
         flags.error = 0;
     }
 
+    StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) :
+            length(anLength),
+            offset (anOffset),
+            data(aMemBuf->content()) {
+        flags.error = 0;
+    }
+
     Range<int64_t> range() const {
         return Range<int64_t>(offset, offset + length);
     }

Reply via email to