Hello,

This is a bug 4223 fix.

This change fixes Store to delay writing (and, therefore, sharing) of
re-triable errors (e.g., 504 Gateway Timeout responses) to clients:
once we start sharing the response with a client, we cannot re-try the
transaction. Since ENTRY_FWD_HDR_WAIT flag purpose is to delay response
sharing with Store clients, this patch fixes and clarifies its usage:

1. Removes unconditional clearing from startWriting().
2. Adds a conditional clearing to StoreEntry::write().
3. Sets it only for may-be-rejected responses.

(2) adds ENTRY_FWD_HDR_WAIT clearing to detect responses that filled the
entire read buffer and must be shared now with the clients because
they can no longer be retried with the current re-forwarding mechanisms
(which rely on completing the bad response transaction first) and will
get stuck. Such large re-triable error responses (>32KB with default
read_ahead_gap) should be uncommon. They were getting stuck prior to
trunk r12501.1.48. That revision started clearing ENTRY_FWD_HDR_WAIT in
StoreEntry startWriting() unconditionally, allowing all errors to be
sent to Store clients without a delay, and effectively disabling
retries.

Mgr::Forwarder was always setting ENTRY_FWD_HDR_WAIT, probably mimicking
similarly aggressive FwdState behavior that we are now removing. Since
the forwarder never rewrites the entry content, it should not need to
set that flag. The forwarder and associated Server classes must not
communicate with the mgr client during the client-Squid connection
descriptor handoff to Coordinator, but ENTRY_FWD_HDR_WAIT is not the
right mechanism to block such Squid-client communication. The flag does
not work well for this purpose because those Server classes may (and
do?) substitute the "blocked" StoreEntry with another one to
write an error message to the client.

Also moved ENTRY_FWD_HDR_WAIT clearance from many StoreEntry::complete()
callers to that method itself. StoreEntry::complete() is meant to be the
last call when forming the entry. Any post-complete entry modifications
such as retries are prohibited.


Regards,

Eduard.

Bug 4223: Fixed retries of failed re-forwardable transactions.

This change fixes Store to delay writing (and, therefore, sharing) of
re-triable errors (e.g., 504 Gateway Timeout responses) to clients:
once we start sharing the response with a client, we cannot re-try the
transaction. Since ENTRY_FWD_HDR_WAIT flag purpose is to delay response
sharing with Store clients, this patch fixes and clarifies its usage:

1. Removes unconditional clearing from startWriting().
2. Adds a conditional clearing to StoreEntry::write().
3. Sets it only for may-be-rejected responses.

(2) adds ENTRY_FWD_HDR_WAIT clearing to detect responses that filled the
entire read buffer and must be shared now with the clients because
they can no longer be retried with the current re-forwarding mechanisms
(which rely on completing the bad response transaction first) and will
get stuck. Such large re-triable error responses (>32KB with default
read_ahead_gap) should be uncommon. They were getting stuck prior to
trunk r12501.1.48. That revision started clearing ENTRY_FWD_HDR_WAIT in
StoreEntry startWriting() unconditionally, allowing all errors to be
sent to Store clients without a delay, and effectively disabling
retries.

Mgr::Forwarder was always setting ENTRY_FWD_HDR_WAIT, probably mimicking
similarly aggressive FwdState behavior that we are now removing. Since
the forwarder never rewrites the entry content, it should not need to
set that flag. The forwarder and associated Server classes must not
communicate with the mgr client during the client-Squid connection
descriptor handoff to Coordinator, but ENTRY_FWD_HDR_WAIT is not the
right mechanism to block such Squid-client communication. The flag does
not work well for this purpose because those Server classes may (and
do?) substitute the "blocked" StoreEntry with another one to
write an error message to the client.

Also moved ENTRY_FWD_HDR_WAIT clearance from many StoreEntry::complete()
callers to that method itself. StoreEntry::complete() is meant to be the
last call when forming the entry. Any post-complete entry modifications
such as retries are prohibited.

=== modified file 'src/FwdState.cc'
--- src/FwdState.cc	2017-06-26 14:34:57 +0000
+++ src/FwdState.cc	2017-06-28 09:11:30 +0000
@@ -114,61 +114,60 @@ FwdState::abort(void* d)
     fwd->serverDestinations.clear();
     fwd->stopAndDestroy("store entry aborted");
 }
 
 void
 FwdState::closeServerConnection(const char *reason)
 {
     debugs(17, 3, "because " << reason << "; " << serverConn);
     comm_remove_close_handler(serverConn->fd, closeHandler);
     closeHandler = NULL;
     fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
     serverConn->close();
 }
 
 /**** PUBLIC INTERFACE ********************************************************/
 
 FwdState::FwdState(const Comm::ConnectionPointer &client, StoreEntry * e, HttpRequest * r, const AccessLogEntryPointer &alp):
     entry(e),
     request(r),
     al(alp),
     err(NULL),
     clientConn(client),
     start_t(squid_curtime),
     n_tries(0),
     pconnRace(raceImpossible)
 {
     debugs(17, 2, "Forwarding client request " << client << ", url=" << e->url());
     HTTPMSGLOCK(request);
     serverDestinations.reserve(Config.forward_max_tries);
     e->lock("FwdState");
-    EBIT_SET(e->flags, ENTRY_FWD_HDR_WAIT);
     flags.connected_okay = false;
     flags.dont_retry = false;
     flags.forward_completed = false;
     debugs(17, 3, "FwdState constructed, this=" << this);
 }
 
 // Called once, right after object creation, when it is safe to set self
 void FwdState::start(Pointer aSelf)
 {
     // Protect ourselves from being destroyed when the only Server pointing
     // to us is gone (while we expect to talk to more Servers later).
     // Once we set self, we are responsible for clearing it when we do not
     // expect to talk to any servers.
     self = aSelf; // refcounted
 
     // We hope that either the store entry aborts or peer is selected.
     // Otherwise we are going to leak our object.
 
     // Ftp::Relay needs to preserve control connection on data aborts
     // so it registers its own abort handler that calls ours when needed.
     if (!request->flags.ftpNative)
         entry->registerAbort(FwdState::abort, this);
 
 #if STRICT_ORIGINAL_DST
     // Bug 3243: CVE 2009-0801
     // Bypass of browser same-origin access control in intercepted communication
     // To resolve this we must force DIRECT and only to the original client destination.
     const bool isIntercepted = request && !request->flags.redirected && (request->flags.intercepted || request->flags.interceptTproxy);
     const bool useOriginalDst = Config.onoff.client_dst_passthru || (request && !request->flags.hostVerified);
     if (isIntercepted && useOriginalDst) {
@@ -239,61 +238,60 @@ FwdState::completed()
     }
 
     flags.forward_completed = true;
 
     request->hier.stopPeerClock(false);
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         debugs(17, 3, HERE << "entry aborted");
         return ;
     }
 
 #if URL_CHECKSUM_DEBUG
 
     entry->mem_obj->checkUrlChecksum();
 #endif
 
     if (entry->store_status == STORE_PENDING) {
         if (entry->isEmpty()) {
             if (!err) // we quit (e.g., fd closed) before an error or content
                 fail(new ErrorState(ERR_READ_ERROR, Http::scBadGateway, request));
             assert(err);
             errorAppendEntry(entry, err);
             err = NULL;
 #if USE_OPENSSL
             if (request->flags.sslPeek && request->clientConnectionManager.valid()) {
                 CallJobHere1(17, 4, request->clientConnectionManager, ConnStateData,
                              ConnStateData::httpsPeeked, ConnStateData::PinnedIdleContext(Comm::ConnectionPointer(nullptr), request));
             }
 #endif
         } else {
-            EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
             entry->complete();
             entry->releaseRequest();
         }
     }
 
     if (storePendingNClients(entry) > 0)
         assert(!EBIT_TEST(entry->flags, ENTRY_FWD_HDR_WAIT));
 
 }
 
 FwdState::~FwdState()
 {
     debugs(17, 3, "FwdState destructor start");
 
     if (! flags.forward_completed)
         completed();
 
     doneWithRetries();
 
     HTTPMSGUNLOCK(request);
 
     delete err;
 
     entry->unregisterAbort();
 
     entry->unlock("FwdState");
 
     entry = NULL;
 
     if (calls.connector != NULL) {
@@ -511,61 +509,60 @@ FwdState::unregister(int fd)
  */
 void
 FwdState::complete()
 {
     debugs(17, 3, HERE << entry->url() << "\n\tstatus " << entry->getReply()->sline.status());
 #if URL_CHECKSUM_DEBUG
 
     entry->mem_obj->checkUrlChecksum();
 #endif
 
     logReplyStatus(n_tries, entry->getReply()->sline.status());
 
     if (reforward()) {
         debugs(17, 3, HERE << "re-forwarding " << entry->getReply()->sline.status() << " " << entry->url());
 
         if (Comm::IsConnOpen(serverConn))
             unregister(serverConn);
 
         entry->reset();
 
         // drop the last path off the selection list. try the next one.
         if (!serverDestinations.empty()) // paranoid
             serverDestinations.erase(serverDestinations.begin());
         startConnectionOrFail();
 
     } else {
         if (Comm::IsConnOpen(serverConn))
             debugs(17, 3, HERE << "server FD " << serverConnection()->fd << " not re-forwarding status " << entry->getReply()->sline.status());
         else
             debugs(17, 3, HERE << "server (FD closed) not re-forwarding status " << entry->getReply()->sline.status());
-        EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
         entry->complete();
 
         if (!Comm::IsConnOpen(serverConn))
             completed();
 
         stopAndDestroy("forwarding completed");
     }
 }
 
 void
 FwdState::noteDestination(Comm::ConnectionPointer path)
 {
     const bool wasBlocked = serverDestinations.empty();
     serverDestinations.push_back(path);
     if (wasBlocked)
         startConnectionOrFail();
     // else continue to use one of the previously noted destinations;
     // if all of them fail, we may try this path
 }
 
 void
 FwdState::noteDestinationsEnd(ErrorState *selectionError)
 {
     PeerSelectionInitiator::subscribed = false;
     if (serverDestinations.empty()) { // was blocked, waiting for more paths
 
         if (selectionError) {
             debugs(17, 3, "Will abort forwarding because path selection has failed.");
             Must(!err); // if we tried to connect, then path selection succeeded
             fail(selectionError);

=== modified file 'src/MemStore.cc'
--- src/MemStore.cc	2017-06-01 23:34:40 +0000
+++ src/MemStore.cc	2017-06-28 09:14:13 +0000
@@ -550,61 +550,60 @@ MemStore::copyFromShm(StoreEntry &e, con
     e.store_status = STORE_OK;
     e.setMemStatus(IN_MEMORY);
 
     assert(e.mem_obj->object_sz >= 0);
     assert(static_cast<uint64_t>(e.mem_obj->object_sz) == anchor.basics.swap_file_sz);
     // would be nice to call validLength() here, but it needs e.key
 
     // we read the entire response into the local memory; no more need to lock
     disconnect(e);
     return true;
 }
 
 /// imports one shared memory slice into local memory
 bool
 MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
 {
     debugs(20, 7, "buf: " << buf.offset << " + " << buf.length);
 
     // from store_client::readBody()
     // parse headers if needed; they might span multiple slices!
     HttpReply *rep = (HttpReply *)e.getReply();
     if (rep->pstate < Http::Message::psParsed) {
         // XXX: have to copy because httpMsgParseStep() requires 0-termination
         MemBuf mb;
         mb.init(buf.length+1, buf.length+1);
         mb.append(buf.data, buf.length);
         mb.terminate();
         const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof);
         if (result > 0) {
             assert(rep->pstate == Http::Message::psParsed);
-            EBIT_CLR(e.flags, ENTRY_FWD_HDR_WAIT);
         } else if (result < 0) {
             debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e);
             return false;
         } else { // more slices are needed
             assert(!eof);
         }
     }
     debugs(20, 7, "rep pstate: " << rep->pstate);
 
     // local memory stores both headers and body so copy regardless of pstate
     const int64_t offBefore = e.mem_obj->endOffset();
     assert(e.mem_obj->data_hdr.write(buf)); // from MemObject::write()
     const int64_t offAfter = e.mem_obj->endOffset();
     // expect to write the entire buf because StoreEntry::write() never fails
     assert(offAfter >= 0 && offBefore <= offAfter &&
            static_cast<size_t>(offAfter - offBefore) == buf.length);
     return true;
 }
 
 /// whether we should cache the entry
 bool
 MemStore::shouldCache(StoreEntry &e) const
 {
     if (e.mem_status == IN_MEMORY) {
         debugs(20, 5, "already loaded from mem-cache: " << e);
         return false;
     }
 
     if (e.mem_obj && e.mem_obj->memCache.offset > 0) {
         debugs(20, 5, "already written to mem-cache: " << e);
@@ -651,69 +650,63 @@ MemStore::shouldCache(StoreEntry &e) con
     return true;
 }
 
 /// locks map anchor and preps to store the entry in shared memory
 bool
 MemStore::startCaching(StoreEntry &e)
 {
     sfileno index = 0;
     Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e.key), index);
     if (!slot) {
         debugs(20, 5, HERE << "No room in mem-cache map to index " << e);
         return false;
     }
 
     assert(e.mem_obj);
     e.mem_obj->memCache.index = index;
     e.mem_obj->memCache.io = MemObject::ioWriting;
     slot->set(e);
     // Do not allow others to feed off an unknown-size entry because we will
     // stop swapping it out if it grows too large.
     if (e.mem_obj->expectedReplySize() >= 0)
         map->startAppending(index);
     e.memOutDecision(true);
     return true;
 }
 
 /// copies all local data to shared memory
 void
 MemStore::copyToShm(StoreEntry &e)
 {
-    // prevents remote readers from getting ENTRY_FWD_HDR_WAIT entries and
-    // not knowing when the wait is over
-    if (EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT)) {
-        debugs(20, 5, "postponing copying " << e << " for ENTRY_FWD_HDR_WAIT");
-        return;
-    }
-
     assert(map);
     assert(e.mem_obj);
+    Must(!EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT));
 
     const int64_t eSize = e.mem_obj->endOffset();
     if (e.mem_obj->memCache.offset >= eSize) {
         debugs(20, 5, "postponing copying " << e << " for lack of news: " <<
                e.mem_obj->memCache.offset << " >= " << eSize);
         return; // nothing to do (yet)
     }
 
     // throw if an accepted unknown-size entry grew too big or max-size changed
     Must(eSize <= maxObjectSize());
 
     const int32_t index = e.mem_obj->memCache.index;
     assert(index >= 0);
     Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
     lastWritingSlice = anchor.start;
 
     // fill, skip slices that are already full
     // Optimize: remember lastWritingSlice in e.mem_obj
     while (e.mem_obj->memCache.offset < eSize) {
         Ipc::StoreMap::Slice &slice = nextAppendableSlice(
                                           e.mem_obj->memCache.index, lastWritingSlice);
         if (anchor.start < 0)
             anchor.start = lastWritingSlice;
         copyToShmSlice(e, anchor, slice);
     }
 
     debugs(20, 7, "mem-cached available " << eSize << " bytes of " << e);
 }
 
 /// copies at most one slice worth of local memory to shared memory

=== modified file 'src/client_side_request.cc'
--- src/client_side_request.cc	2017-06-26 02:14:42 +0000
+++ src/client_side_request.cc	2017-06-28 09:14:56 +0000
@@ -1919,61 +1919,60 @@ ClientHttpRequest::handleAdaptedHeader(H
         HTTPMSGUNLOCK(request);
         request = new_req;
         HTTPMSGLOCK(request);
 
         // update the new message to flag whether URL re-writing was done on it
         if (request->effectiveRequestUri().cmp(uri) != 0)
             request->flags.redirected = 1;
 
         /*
          * Store the new URI for logging
          */
         xfree(uri);
         uri = SBufToCstring(request->effectiveRequestUri());
         setLogUri(this, urlCanonicalClean(request));
         assert(request->method.id());
     } else if (HttpReply *new_rep = dynamic_cast<HttpReply*>(msg)) {
         debugs(85,3,HERE << "REQMOD reply is HTTP reply");
 
         // subscribe to receive reply body
         if (new_rep->body_pipe != NULL) {
             adaptedBodySource = new_rep->body_pipe;
             int consumer_ok = adaptedBodySource->setConsumerIfNotLate(this);
             assert(consumer_ok);
         }
 
         clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data;
         clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
         assert(repContext);
         repContext->createStoreEntry(request->method, request->flags);
 
-        EBIT_CLR(storeEntry()->flags, ENTRY_FWD_HDR_WAIT);
         request_satisfaction_mode = true;
         request_satisfaction_offset = 0;
         storeEntry()->replaceHttpReply(new_rep);
         storeEntry()->timestampsSet();
 
         if (!adaptedBodySource) // no body
             storeEntry()->complete();
         clientGetMoreData(node, this);
     }
 
     // we are done with getting headers (but may be receiving body)
     clearAdaptation(virginHeadSource);
 
     if (!request_satisfaction_mode)
         doCallouts();
 }
 
 void
 ClientHttpRequest::handleAdaptationBlock(const Adaptation::Answer &answer)
 {
     request->detailError(ERR_ACCESS_DENIED, ERR_DETAIL_REQMOD_BLOCK);
     AclMatchedName = answer.ruleId.termedBuf();
     assert(calloutContext);
     calloutContext->clientAccessCheckDone(ACCESS_DENIED);
     AclMatchedName = NULL;
 }
 
 void
 ClientHttpRequest::resumeBodyStorage()
 {

=== modified file 'src/clients/FtpGateway.cc'
--- src/clients/FtpGateway.cc	2017-05-16 06:48:07 +0000
+++ src/clients/FtpGateway.cc	2017-06-28 09:15:49 +0000
@@ -2262,61 +2262,60 @@ ftpReadRetr(Ftp::Gateway * ftpState)
     } else if (code == 150) {
         /* Accept data channel */
         ftpState->listenForDataChannel(ftpState->data.conn);
     } else if (code >= 300) {
         if (!ftpState->flags.try_slash_hack) {
             /* Try this as a directory missing trailing slash... */
             ftpState->hackShortcut(ftpSendCwd);
         } else {
             ftpFail(ftpState);
         }
     } else {
         ftpFail(ftpState);
     }
 }
 
 /**
  * Generate the HTTP headers and template fluff around an FTP
  * directory listing display.
  */
 void
 Ftp::Gateway::completedListing()
 {
     assert(entry);
     entry->lock("Ftp::Gateway");
     ErrorState ferr(ERR_DIR_LISTING, Http::scOkay, request.getRaw());
     ferr.ftp.listing = &listing;
     ferr.ftp.cwd_msg = xstrdup(cwd_message.size()? cwd_message.termedBuf() : "");
     ferr.ftp.server_msg = ctrl.message;
     ctrl.message = NULL;
     entry->replaceHttpReply(ferr.BuildHttpReply());
-    EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
     entry->flush();
     entry->unlock("Ftp::Gateway");
 }
 
 static void
 ftpReadTransferDone(Ftp::Gateway * ftpState)
 {
     int code = ftpState->ctrl.replycode;
     debugs(9, 3, HERE);
 
     if (code == 226 || code == 250) {
         /* Connection closed; retrieval done. */
         if (ftpState->flags.listing) {
             ftpState->completedListing();
             /* QUIT operation handles sending the reply to client */
         }
         ftpSendQuit(ftpState);
     } else {            /* != 226 */
         debugs(9, DBG_IMPORTANT, HERE << "Got code " << code << " after reading data");
         ftpState->failed(ERR_FTP_FAILURE, 0);
         /* failed closes ctrl.conn and frees ftpState */
         return;
     }
 }
 
 // premature end of the request body
 void
 Ftp::Gateway::handleRequestBodyProducerAborted()
 {
     Client::handleRequestBodyProducerAborted();
@@ -2535,62 +2534,60 @@ ftpSendReply(Ftp::Gateway * ftpState)
 
     if (ftpState->old_reply)
         err.ftp.reply = xstrdup(ftpState->old_reply);
     else if (ftpState->ctrl.last_reply)
         err.ftp.reply = xstrdup(ftpState->ctrl.last_reply);
     else
         err.ftp.reply = xstrdup("");
 
     // TODO: interpret as FTP-specific error code
     err.detailError(code);
 
     ftpState->entry->replaceHttpReply(err.BuildHttpReply());
 
     ftpSendQuit(ftpState);
 }
 
 void
 Ftp::Gateway::appendSuccessHeader()
 {
     debugs(9, 3, HERE);
 
     if (flags.http_header_sent)
         return;
 
     HttpReply *reply = new HttpReply;
 
     flags.http_header_sent = 1;
 
     assert(entry->isEmpty());
 
-    EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
-
     entry->buffer();    /* released when done processing current data payload */
 
     SBuf urlPath = request->url.path();
     auto t = urlPath.rfind('/');
     SBuf filename = urlPath.substr(t != SBuf::npos ? t : 0);
 
     const char *mime_type = NULL;
     const char *mime_enc = NULL;
 
     if (flags.isdir) {
         mime_type = "text/html";
     } else {
         switch (typecode) {
 
         case 'I':
             mime_type = "application/octet-stream";
             // XXX: performance regression, c_str() may reallocate
             mime_enc = mimeGetContentEncoding(filename.c_str());
             break;
 
         case 'A':
             mime_type = "text/plain";
             break;
 
         default:
             // XXX: performance regression, c_str() may reallocate
             mime_type = mimeGetContentType(filename.c_str());
             mime_enc = mimeGetContentEncoding(filename.c_str());
             break;
         }

=== modified file 'src/clients/FtpRelay.cc'
--- src/clients/FtpRelay.cc	2017-06-26 14:34:57 +0000
+++ src/clients/FtpRelay.cc	2017-06-28 09:16:39 +0000
@@ -265,61 +265,60 @@ Ftp::Relay::serverState(const Ftp::Serve
  */
 void
 Ftp::Relay::completeForwarding()
 {
     debugs(9, 5, forwardingCompleted);
     if (forwardingCompleted)
         return;
     forwardingCompleted = true;
     Ftp::Client::completeForwarding();
 }
 
 void
 Ftp::Relay::failed(err_type error, int xerrno, ErrorState *ftpErr)
 {
     if (!doneWithServer())
         serverState(fssError);
 
     // TODO: we need to customize ErrorState instead
     if (entry->isEmpty())
         failedErrorMessage(error, xerrno); // as a reply
 
     Ftp::Client::failed(error, xerrno, ftpErr);
 }
 
 void
 Ftp::Relay::failedErrorMessage(err_type error, int xerrno)
 {
     const Http::StatusCode httpStatus = failedHttpStatus(error);
     HttpReply *const reply = createHttpReply(httpStatus);
     entry->replaceHttpReply(reply);
-    EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
     fwd->request->detailError(error, xerrno);
 }
 
 void
 Ftp::Relay::processReplyBody()
 {
     debugs(9, 3, status());
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         /*
          * probably was aborted because content length exceeds one
          * of the maximum size limits.
          */
         abortOnData("entry aborted after calling appendSuccessHeader()");
         return;
     }
 
     if (master().userDataDone) {
         // Squid-to-client data transfer done. Abort data transfer on our
         // side to allow new commands from ftp client
         abortOnData("Squid-to-client data connection is closed");
         return;
     }
 
 #if USE_ADAPTATION
 
     if (adaptationAccessCheckPending) {
         debugs(9, 3, "returning due to adaptationAccessCheckPending");
         return;
     }
@@ -348,61 +347,60 @@ Ftp::Relay::handleControlReply()
     }
 
     Ftp::Client::handleControlReply();
     if (ctrl.message == NULL)
         return; // didn't get complete reply yet
 
     assert(state < END);
     assert(this->SM_FUNCS[state] != NULL);
     (this->*SM_FUNCS[state])();
 }
 
 void
 Ftp::Relay::handleRequestBodyProducerAborted()
 {
     ::Client::handleRequestBodyProducerAborted();
 
     failed(ERR_READ_ERROR);
 }
 
 bool
 Ftp::Relay::mayReadVirginReplyBody() const
 {
     // TODO: move this method to the regular FTP server?
     return Comm::IsConnOpen(data.conn);
 }
 
 void
 Ftp::Relay::forwardReply()
 {
     assert(entry->isEmpty());
-    EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
 
     HttpReply *const reply = createHttpReply(Http::scNoContent);
     reply->sources |= Http::Message::srcFtp;
 
     setVirginReply(reply);
     adaptOrFinalizeReply();
 
     serverComplete();
 }
 
 void
 Ftp::Relay::forwardPreliminaryReply(const PreliminaryCb cb)
 {
     debugs(9, 5, "forwarding preliminary reply to client");
 
     // we must prevent concurrent ConnStateData::sendControlMsg() calls
     Must(thePreliminaryCb == NULL);
     thePreliminaryCb = cb;
 
     const HttpReply::Pointer reply = createHttpReply(Http::scContinue);
 
     // the Sink will use this to call us back after writing 1xx to the client
     typedef NullaryMemFunT<Relay> CbDialer;
     const AsyncCall::Pointer call = JobCallback(11, 3, CbDialer, this,
                                     Ftp::Relay::proceedAfterPreliminaryReply);
 
     CallJobHere1(9, 4, request->clientConnectionManager, ConnStateData,
                  ConnStateData::sendControlMsg, HttpControlMsg(reply, call));
 }
 
@@ -426,61 +424,60 @@ Ftp::Relay::forwardError(err_type error,
 HttpReply *
 Ftp::Relay::createHttpReply(const Http::StatusCode httpStatus, const int64_t clen)
 {
     HttpReply *const reply = Ftp::HttpReplyWrapper(ctrl.replycode, ctrl.last_reply, httpStatus, clen);
     if (ctrl.message) {
         for (wordlist *W = ctrl.message; W && W->next; W = W->next)
             reply->header.putStr(Http::HdrType::FTP_PRE, httpHeaderQuoteString(W->key).c_str());
         // no hdrCacheInit() is needed for after Http::HdrType::FTP_PRE addition
     }
     return reply;
 }
 
 void
 Ftp::Relay::handleDataRequest()
 {
     data.addr(master().clientDataAddr);
     connectDataChannel();
 }
 
 void
 Ftp::Relay::startDataDownload()
 {
     assert(Comm::IsConnOpen(data.conn));
 
     debugs(9, 3, "begin data transfer from " << data.conn->remote <<
            " (" << data.conn->local << ")");
 
     HttpReply *const reply = createHttpReply(Http::scOkay, -1);
     reply->sources |= Http::Message::srcFtp;
 
-    EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
     setVirginReply(reply);
     adaptOrFinalizeReply();
 
     maybeReadVirginBody();
     state = READING_DATA;
 }
 
 void
 Ftp::Relay::startDataUpload()
 {
     assert(Comm::IsConnOpen(data.conn));
 
     debugs(9, 3, "begin data transfer to " << data.conn->remote <<
            " (" << data.conn->local << ")");
 
     if (!startRequestBodyFlow()) { // register to receive body data
         failed();
         return;
     }
 
     state = WRITING_DATA;
 }
 
 void
 Ftp::Relay::readGreeting()
 {
     assert(!master().clientReadGreeting);
 
     switch (ctrl.replycode) {
     case 220:

=== modified file 'src/enums.h'
--- src/enums.h	2017-02-20 04:56:00 +0000
+++ src/enums.h	2017-06-28 09:17:27 +0000
@@ -41,67 +41,87 @@ typedef enum {
     PING_WAITING,
     PING_DONE
 } ping_status_t;
 
 typedef enum {
     STORE_OK,
     STORE_PENDING
 } store_status_t;
 
 typedef enum {
     SWAPOUT_NONE,
     SWAPOUT_WRITING,
     SWAPOUT_DONE
 } swap_status_t;
 
 typedef enum {
     STORE_NON_CLIENT,
     STORE_MEM_CLIENT,
     STORE_DISK_CLIENT
 } store_client_t;
 
 /*
  * These are for StoreEntry->flag, which is defined as a SHORT
  *
  * NOTE: These flags are written to swap.state, so think very carefully
  * about deleting or re-assigning!
  */
 enum {
     ENTRY_SPECIAL,
     ENTRY_REVALIDATE_ALWAYS,
+
+    /// Tiny Store writes are likely. The writes should be aggregated together
+    /// before Squid announces the new content availability to the store
+    /// clients. For example, forming a cached HTTP response header may result
+    /// in dozens of StoreEntry::write() calls, many of which adding as little
+    /// as two bytes. Sharing those small writes with the store clients
+    /// increases overhead, especially because the client code can do nothing
+    /// useful with the written content until the whole response header is
+    /// stored. Might be combined with ENTRY_FWD_HDR_WAIT. TODO: Rename to
+    /// ENTRY_DELAY_WHILE_COALESCING to emphasize the difference from and
+    /// similarity with ENTRY_FWD_HDR_WAIT.
     DELAY_SENDING,
     RELEASE_REQUEST,
     REFRESH_REQUEST,
     ENTRY_REVALIDATE_STALE,
     ENTRY_DISPATCHED,
     KEY_PRIVATE,
+
+    /// The current entry response may change. The contents of an entry in this
+    /// state must not be shared with its store clients. For example, Squid
+    /// receives (and buffers) an HTTP/504 response but may decide to retry that
+    /// transaction to receive a successful response from another server
+    /// instead. Might be combined with DELAY_SENDING. TODO: Rename to
+    /// ENTRY_DELAY_WHILE_WOBBLING to emphasize the difference from and
+    /// similarity with DELAY_SENDING.
     ENTRY_FWD_HDR_WAIT,
+
     ENTRY_NEGCACHED,
     ENTRY_VALIDATED,
     ENTRY_BAD_LENGTH,
     ENTRY_ABORTED
 };
 
 /*
  * These are for client Streams. Each node in the stream can be queried for
  * its status
  */
 typedef enum {
     STREAM_NONE,        /* No particular status */
     STREAM_COMPLETE,        /* All data has been flushed, no more reads allowed */
     /* an unpredicted end has occured, no more
      * reads occured, but no need to tell
      * downstream that an error occured
      */
     STREAM_UNPLANNED_COMPLETE,
     /* An error has occured in this node or an above one,
      * and the node is not generating an error body / it's
      * midstream
      */
     STREAM_FAILED
 } clientStream_status_t;
 
 /* stateful helper callback response codes */
 typedef enum {
     S_HELPER_UNKNOWN,
     S_HELPER_RESERVE,
     S_HELPER_RELEASE

=== modified file 'src/gopher.cc'
--- src/gopher.cc	2017-03-03 11:36:02 +0000
+++ src/gopher.cc	2017-06-28 09:18:25 +0000
@@ -214,61 +214,60 @@ gopherMimeCreate(GopherStateData * gophe
 
     case GOPHER_PLUS_MOVIE:
         mime_type = "video/mpeg";
         break;
 
     case GOPHER_MACBINHEX:
 
     case GOPHER_DOSBIN:
 
     case GOPHER_UUENCODED:
 
     case GOPHER_BIN:
         /* Rightnow We have no idea what it is. */
         mime_enc = mimeGetContentEncoding(gopherState->request);
         mime_type = mimeGetContentType(gopherState->request);
         if (!mime_type)
             mime_type = def_gopher_bin;
         break;
 
     case GOPHER_FILE:
 
     default:
         mime_enc = mimeGetContentEncoding(gopherState->request);
         mime_type = mimeGetContentType(gopherState->request);
         if (!mime_type)
             mime_type = def_gopher_text;
         break;
     }
 
     assert(entry->isEmpty());
-    EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
 
     HttpReply *reply = new HttpReply;
     entry->buffer();
     reply->setHeaders(Http::scOkay, "Gatewaying", mime_type, -1, -1, -2);
     if (mime_enc)
         reply->header.putStr(Http::HdrType::CONTENT_ENCODING, mime_enc);
 
     entry->replaceHttpReply(reply);
     gopherState->reply_ = reply;
 }
 
 /**
  * Parse a gopher request into components.  By Anawat.
  */
 static void
 gopher_request_parse(const HttpRequest * req, char *type_id, char *request)
 {
     ::Parser::Tokenizer tok(req->url.path());
 
     if (request)
         *request = 0;
 
     tok.skip('/'); // ignore failures? path could be ab-empty
 
     if (tok.atEnd()) {
         *type_id = GOPHER_DIRECTORY;
         return;
     }
 
     static const CharacterSet anyByte("UTF-8",0x00, 0xFF);

=== modified file 'src/http.cc'
--- src/http.cc	2017-06-26 14:34:57 +0000
+++ src/http.cc	2017-06-28 09:19:24 +0000
@@ -900,81 +900,81 @@ HttpStateData::haveParsedReplyHeaders()
 {
     Client::haveParsedReplyHeaders();
 
     Ctx ctx = ctx_enter(entry->mem_obj->urlXXX());
     HttpReply *rep = finalReply();
     const Http::StatusCode statusCode = rep->sline.status();
 
     entry->timestampsSet();
 
     /* Check if object is cacheable or not based on reply code */
     debugs(11, 3, "HTTP CODE: " << statusCode);
 
     if (const StoreEntry *oldEntry = findPreviouslyCachedEntry(entry))
         sawDateGoBack = rep->olderThan(oldEntry->getReply());
 
     if (neighbors_do_private_keys && !sawDateGoBack)
         httpMaybeRemovePublic(entry, rep->sline.status());
 
     bool varyFailure = false;
     if (rep->header.has(Http::HdrType::VARY)
 #if X_ACCELERATOR_VARY
             || rep->header.has(Http::HdrType::HDR_X_ACCELERATOR_VARY)
 #endif
        ) {
         const SBuf vary(httpMakeVaryMark(request.getRaw(), rep));
 
         if (vary.isEmpty()) {
             // TODO: check whether such responses are shareable.
             // Do not share for now.
             entry->makePrivate(false);
-            if (!fwd->reforwardableStatus(rep->sline.status()))
-                EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
+            if (fwd->reforwardableStatus(rep->sline.status()))
+                EBIT_SET(entry->flags, ENTRY_FWD_HDR_WAIT);
             varyFailure = true;
         } else {
             entry->mem_obj->vary_headers = vary;
 
             // RFC 7231 section 7.1.4
             // Vary:* can be cached, but has mandatory revalidation
             static const SBuf asterisk("*");
             if (vary == asterisk)
                 EBIT_SET(entry->flags, ENTRY_REVALIDATE_ALWAYS);
         }
     }
 
     if (!varyFailure) {
         /*
          * If its not a reply that we will re-forward, then
          * allow the client to get it.
          */
-        if (!fwd->reforwardableStatus(rep->sline.status()))
-            EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
+        if (fwd->reforwardableStatus(rep->sline.status()))
+            EBIT_SET(entry->flags, ENTRY_FWD_HDR_WAIT);
 
         ReuseDecision decision(entry, statusCode);
 
         switch (reusableReply(decision)) {
 
         case ReuseDecision::reuseNot:
             entry->makePrivate(false);
             break;
 
         case ReuseDecision::cachePositively:
             entry->makePublic();
             break;
 
         case ReuseDecision::cacheNegatively:
             entry->cacheNegatively();
             break;
 
         case ReuseDecision::doNotCacheButShare:
             entry->makePrivate(true);
             break;
 
         default:
             assert(0);
             break;
         }
         debugs(11, 3, "decided: " << decision);
     }
 
     if (!ignoreCacheControl) {
         if (rep->cache_control) {

=== modified file 'src/ipc/Forwarder.cc'
--- src/ipc/Forwarder.cc	2017-06-23 22:15:30 +0000
+++ src/ipc/Forwarder.cc	2017-06-28 09:19:49 +0000
@@ -68,62 +68,64 @@ Ipc::Forwarder::start()
     SendMessage(Ipc::Port::CoordinatorAddr(), message);
     eventAdd("Ipc::Forwarder::requestTimedOut", &Forwarder::RequestTimedOut,
              this, timeout, 0, false);
 }
 
 void
 Ipc::Forwarder::swanSong()
 {
     debugs(54, 5, HERE);
     removeTimeoutEvent();
     if (request->requestId > 0) {
         DequeueRequest(request->requestId);
         request->requestId = 0;
     }
     cleanup();
 }
 
 bool
 Ipc::Forwarder::doneAll() const
 {
     debugs(54, 5, HERE);
     return request->requestId == 0;
 }
 
 /// called when Coordinator starts processing the request
 void
 Ipc::Forwarder::handleRemoteAck()
 {
     debugs(54, 3, HERE);
     request->requestId = 0;
-    // Do not clear ENTRY_FWD_HDR_WAIT or do entry->complete() because
-    // it will trigger our client side processing. Let job cleanup close.
+    // Do not do entry->complete() because it will trigger our client side
+    // processing when we no longer own the client-Squid connection.
+    // Let job cleanup close the client-Squid connection that Coordinator
+    // now owns.
 }
 
 /// Ipc::Forwarder::requestTimedOut wrapper
 void
 Ipc::Forwarder::RequestTimedOut(void* param)
 {
     debugs(54, 3, HERE);
     Must(param != NULL);
     Forwarder* fwdr = static_cast<Forwarder*>(param);
     // use async call to enable job call protection that time events lack
     CallJobHere(54, 5, fwdr, Forwarder, requestTimedOut);
 }
 
 /// called when Coordinator fails to start processing the request [in time]
 void
 Ipc::Forwarder::requestTimedOut()
 {
     debugs(54, 3, HERE);
     handleTimeout();
 }
 
 void
 Ipc::Forwarder::handleError()
 {
     mustStop("error");
 }
 
 void
 Ipc::Forwarder::handleTimeout()
 {

=== modified file 'src/mgr/Forwarder.cc'
--- src/mgr/Forwarder.cc	2017-06-24 00:01:51 +0000
+++ src/mgr/Forwarder.cc	2017-06-28 09:20:35 +0000
@@ -10,61 +10,60 @@
 
 #include "squid.h"
 #include "base/AsyncJobCalls.h"
 #include "base/TextException.h"
 #include "comm/Connection.h"
 #include "CommCalls.h"
 #include "errorpage.h"
 #include "globals.h"
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "ipc/Port.h"
 #include "mgr/Forwarder.h"
 #include "mgr/Request.h"
 #include "SquidTime.h"
 #include "Store.h"
 
 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Forwarder);
 
 Mgr::Forwarder::Forwarder(const Comm::ConnectionPointer &aConn, const ActionParams &aParams,
                           HttpRequest* aRequest, StoreEntry* anEntry):
     Ipc::Forwarder(new Request(KidIdentifier, 0, aConn, aParams), 10),
     httpRequest(aRequest), entry(anEntry), conn(aConn)
 {
     debugs(16, 5, HERE << conn);
     Must(Comm::IsConnOpen(conn));
     Must(httpRequest != NULL);
     Must(entry != NULL);
 
     HTTPMSGLOCK(httpRequest);
     entry->lock("Mgr::Forwarder");
-    EBIT_SET(entry->flags, ENTRY_FWD_HDR_WAIT);
 
     closer = asyncCall(16, 5, "Mgr::Forwarder::noteCommClosed",
                        CommCbMemFunT<Forwarder, CommCloseCbParams>(this, &Forwarder::noteCommClosed));
     comm_add_close_handler(conn->fd, closer);
 }
 
 Mgr::Forwarder::~Forwarder()
 {
     debugs(16, 5, HERE);
     Must(httpRequest != NULL);
     Must(entry != NULL);
 
     HTTPMSGUNLOCK(httpRequest);
     entry->unregisterAbort();
     entry->unlock("Mgr::Forwarder");
     cleanup();
 }
 
 /// closes our copy of the client HTTP connection socket
 void
 Mgr::Forwarder::cleanup()
 {
     if (Comm::IsConnOpen(conn)) {
         if (closer != NULL) {
             comm_remove_close_handler(conn->fd, closer);
             closer = NULL;
         }
         conn->close();
     }
     conn = NULL;
@@ -84,39 +83,38 @@ Mgr::Forwarder::handleTimeout()
     sendError(new ErrorState(ERR_LIFETIME_EXP, Http::scRequestTimeout, httpRequest));
     Ipc::Forwarder::handleTimeout();
 }
 
 void
 Mgr::Forwarder::handleException(const std::exception &e)
 {
     if (entry != NULL && httpRequest != NULL && Comm::IsConnOpen(conn))
         sendError(new ErrorState(ERR_INVALID_RESP, Http::scInternalServerError, httpRequest));
     Ipc::Forwarder::handleException(e);
 }
 
 /// called when the client socket gets closed by some external force
 void
 Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &)
 {
     debugs(16, 5, HERE);
     conn = NULL; // needed?
     mustStop("commClosed");
 }
 
 /// send error page
 void
 Mgr::Forwarder::sendError(ErrorState *error)
 {
     debugs(16, 3, HERE);
     Must(error != NULL);
     Must(entry != NULL);
     Must(httpRequest != NULL);
 
-    EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
     entry->buffer();
     entry->replaceHttpReply(error->BuildHttpReply());
     entry->expires = squid_curtime;
     delete error;
     entry->flush();
     entry->complete();
 }
 

=== modified file 'src/store.cc'
--- src/store.cc	2017-06-01 23:34:40 +0000
+++ src/store.cc	2017-06-28 09:23:04 +0000
@@ -809,62 +809,66 @@ storeCreateEntry(const char *url, const
         e->setPublicKey();
 
     return e;
 }
 
 /* Mark object as expired */
 void
 StoreEntry::expireNow()
 {
     debugs(20, 3, "StoreEntry::expireNow: '" << getMD5Text() << "'");
     expires = squid_curtime;
 }
 
 void
 StoreEntry::write (StoreIOBuffer writeBuffer)
 {
     assert(mem_obj != NULL);
     /* This assert will change when we teach the store to update */
     PROF_start(StoreEntry_write);
     assert(store_status == STORE_PENDING);
 
     // XXX: caller uses content offset, but we also store headers
     if (const HttpReplyPointer reply = mem_obj->getReply())
         writeBuffer.offset += reply->hdr_sz;
 
     debugs(20, 5, "storeWrite: writing " << writeBuffer.length << " bytes for '" << getMD5Text() << "'");
     PROF_stop(StoreEntry_write);
     storeGetMemSpace(writeBuffer.length);
     mem_obj->write(writeBuffer);
 
-    if (!EBIT_TEST(flags, DELAY_SENDING))
-        invokeHandlers();
+    if (EBIT_TEST(flags, ENTRY_FWD_HDR_WAIT) && !mem_obj->readAheadPolicyCanRead()) {
+    	debugs(20, 3, "allow Store clients to get entry content after buffering too much for " << *this);
+        EBIT_CLR(flags, ENTRY_FWD_HDR_WAIT);
+    }
+
+    invokeHandlers();
 }
 
 /* Append incoming data from a primary server to an entry. */
 void
 StoreEntry::append(char const *buf, int len)
 {
     assert(mem_obj != NULL);
     assert(len >= 0);
     assert(store_status == STORE_PENDING);
 
     StoreIOBuffer tempBuffer;
     tempBuffer.data = (char *)buf;
     tempBuffer.length = len;
     /*
      * XXX sigh, offset might be < 0 here, but it gets "corrected"
      * later.  This offset crap is such a mess.
      */
     tempBuffer.offset = mem_obj->endOffset() - (getReply() ? getReply()->hdr_sz : 0);
     write(tempBuffer);
 }
 
 void
 StoreEntry::vappendf(const char *fmt, va_list vargs)
 {
     LOCAL_ARRAY(char, buf, 4096);
     *buf = 0;
     int x;
 
 #ifdef VA_COPY
     va_args ap;
@@ -1063,116 +1067,122 @@ storeCheckCachableStats(StoreEntry *sent
                       store_check_cachable_hist.no.negative_cached);
     storeAppendPrintf(sentry, "no.missing_parts\t%d\n",
                       store_check_cachable_hist.no.missing_parts);
     storeAppendPrintf(sentry, "no.too_big\t%d\n",
                       store_check_cachable_hist.no.too_big);
     storeAppendPrintf(sentry, "no.too_small\t%d\n",
                       store_check_cachable_hist.no.too_small);
     storeAppendPrintf(sentry, "no.private_key\t%d\n",
                       store_check_cachable_hist.no.private_key);
     storeAppendPrintf(sentry, "no.too_many_open_files\t%d\n",
                       store_check_cachable_hist.no.too_many_open_files);
     storeAppendPrintf(sentry, "no.too_many_open_fds\t%d\n",
                       store_check_cachable_hist.no.too_many_open_fds);
     storeAppendPrintf(sentry, "yes.default\t%d\n",
                       store_check_cachable_hist.yes.Default);
 }
 
 void
 StoreEntry::lengthWentBad(const char *reason)
 {
     debugs(20, 3, "because " << reason << ": " << *this);
     EBIT_SET(flags, ENTRY_BAD_LENGTH);
     releaseRequest();
 }
 
 void
 StoreEntry::complete()
 {
     debugs(20, 3, "storeComplete: '" << getMD5Text() << "'");
 
+    // To preserve forwarding retries, call FwdState::complete() instead.
+    EBIT_CLR(flags, ENTRY_FWD_HDR_WAIT);
+
     if (store_status != STORE_PENDING) {
         /*
          * if we're not STORE_PENDING, then probably we got aborted
          * and there should be NO clients on this entry
          */
         assert(EBIT_TEST(flags, ENTRY_ABORTED));
         assert(mem_obj->nclients == 0);
         return;
     }
 
     /* This is suspect: mem obj offsets include the headers. do we adjust for that
      * in use of object_sz?
      */
     mem_obj->object_sz = mem_obj->endOffset();
 
     store_status = STORE_OK;
 
     assert(mem_status == NOT_IN_MEMORY);
 
     if (!EBIT_TEST(flags, ENTRY_BAD_LENGTH) && !validLength())
         lengthWentBad("!validLength() in complete()");
 
 #if USE_CACHE_DIGESTS
     if (mem_obj->request)
         mem_obj->request->hier.store_complete_stop = current_time;
 
 #endif
     /*
      * We used to call invokeHandlers, then storeSwapOut.  However,
      * Madhukar Reddy <[email protected]> reported that
      * responses without content length would sometimes get released
      * in client_side, thinking that the response is incomplete.
      */
     invokeHandlers();
 }
 
 /*
  * Someone wants to abort this transfer.  Set the reason in the
  * request structure, call the callback and mark the
  * entry for releasing
  */
 void
 StoreEntry::abort()
 {
     ++statCounter.aborted_requests;
     assert(store_status == STORE_PENDING);
     assert(mem_obj != NULL);
     debugs(20, 6, "storeAbort: " << getMD5Text());
 
     lock("StoreEntry::abort");         /* lock while aborting */
     negativeCache();
 
     releaseRequest();
 
     EBIT_SET(flags, ENTRY_ABORTED);
 
+    // allow the Store clients to be told about the problem
+    EBIT_CLR(flags, ENTRY_FWD_HDR_WAIT);
+
     setMemStatus(NOT_IN_MEMORY);
 
     store_status = STORE_OK;
 
     /* Notify the server side */
 
     /*
      * DPW 2007-05-07
      * Should we check abort.data for validity?
      */
     if (mem_obj->abort.callback) {
         if (!cbdataReferenceValid(mem_obj->abort.data))
             debugs(20, DBG_IMPORTANT,HERE << "queueing event when abort.data is not valid");
         eventAdd("mem_obj->abort.callback",
                  mem_obj->abort.callback,
                  mem_obj->abort.data,
                  0.0,
                  true);
         unregisterAbort();
     }
 
     /* XXX Should we reverse these two, so that there is no
      * unneeded disk swapping triggered?
      */
     /* Notify the client side */
     invokeHandlers();
 
     // abort swap out, invalidating what was created so far (release follows)
     swapOutFileClose(StoreIOState::writerGone);
 
@@ -1849,61 +1859,60 @@ StoreEntry::replaceHttpReply(HttpReply *
 {
     debugs(20, 3, "StoreEntry::replaceHttpReply: " << url());
 
     if (!mem_obj) {
         debugs(20, DBG_CRITICAL, "Attempt to replace object with no in-memory representation");
         return;
     }
 
     mem_obj->replaceReply(HttpReplyPointer(rep));
 
     if (andStartWriting)
         startWriting();
 }
 
 void
 StoreEntry::startWriting()
 {
     /* TODO: when we store headers separately remove the header portion */
     /* TODO: mark the length of the headers ? */
     /* We ONLY want the headers */
 
     assert (isEmpty());
     assert(mem_obj);
 
     const HttpReply *rep = getReply();
     assert(rep);
 
     buffer();
     rep->packHeadersInto(this);
     mem_obj->markEndOfReplyHeaders();
-    EBIT_CLR(flags, ENTRY_FWD_HDR_WAIT);
 
     rep->body.packInto(this);
     flush();
 }
 
 char const *
 StoreEntry::getSerialisedMetaData()
 {
     StoreMeta *tlv_list = storeSwapMetaBuild(this);
     int swap_hdr_sz;
     char *result = storeSwapMetaPack(tlv_list, &swap_hdr_sz);
     storeSwapTLVFree(tlv_list);
     assert (swap_hdr_sz >= 0);
     mem_obj->swap_hdr_sz = (size_t) swap_hdr_sz;
     return result;
 }
 
 /**
  * Abandon the transient entry our worker has created if neither the shared
  * memory cache nor the disk cache wants to store it. Collapsed requests, if
  * any, should notice and use Plan B instead of getting stuck waiting for us
  * to start swapping the entry out.
  */
 void
 StoreEntry::transientsAbandonmentCheck()
 {
     if (mem_obj && !mem_obj->smpCollapsed && // this worker is responsible
             mem_obj->xitTable.index >= 0 && // other workers may be interested
             mem_obj->memCache.index < 0 && // rejected by the shared memory cache
             mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) {

=== modified file 'src/store_client.cc'
--- src/store_client.cc	2017-06-18 15:17:48 +0000
+++ src/store_client.cc	2017-06-28 09:24:31 +0000
@@ -252,65 +252,60 @@ store_client::moreToSend() const
 
     // If we do not know the entry length, then we have to open the swap file.
     const bool canSwapIn = entry->swap_filen >= 0;
     if (len < 0)
         return canSwapIn;
 
     if (copyInto.offset >= len)
         return false; // sent everything there is
 
     if (canSwapIn)
         return true; // if we lack prefix, we can swap it in
 
     // If we cannot swap in, make sure we have what we want in RAM. Otherwise,
     // scheduleRead calls scheduleDiskRead which asserts without a swap file.
     const MemObject *mem = entry->mem_obj;
     return mem &&
            mem->inmem_lo <= copyInto.offset && copyInto.offset < mem->endOffset();
 }
 
 static void
 storeClientCopy2(StoreEntry * e, store_client * sc)
 {
     /* reentrancy not allowed  - note this could lead to
      * dropped events
      */
 
     if (sc->flags.copy_event_pending) {
         return;
     }
 
-    if (EBIT_TEST(e->flags, ENTRY_FWD_HDR_WAIT)) {
-        debugs(90, 5, "storeClientCopy2: returning because ENTRY_FWD_HDR_WAIT set");
-        return;
-    }
-
     if (sc->flags.store_copying) {
         sc->flags.copy_event_pending = true;
         debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
         eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0);
         return;
     }
 
     debugs(90, 3, "storeClientCopy2: " << e->getMD5Text());
     assert(sc->_callback.pending());
     /*
      * We used to check for ENTRY_ABORTED here.  But there were some
      * problems.  For example, we might have a slow client (or two) and
      * the peer server is reading far ahead and swapping to disk.  Even
      * if the peer aborts, we want to give the client(s)
      * everything we got before the abort condition occurred.
      */
     /* Warning: doCopy may indirectly free itself in callbacks,
      * hence the lock to keep it active for the duration of
      * this function
      * XXX: Locking does not prevent calling sc destructor (it only prevents
      * freeing sc memory) so sc may become invalid from C++ p.o.v.
      */
     CbcPointer<store_client> tmpLock = sc;
     assert (!sc->flags.store_copying);
     sc->doCopy(e);
     assert(!sc->flags.store_copying);
 }
 
 void
 store_client::doCopy(StoreEntry *anEntry)
@@ -684,60 +679,69 @@ storeUnregister(store_client * sc, Store
 
 #if STORE_CLIENT_LIST_DEBUG
     cbdataReferenceDone(sc->owner);
 
 #endif
 
     delete sc;
 
     assert(e->locked());
     // An entry locked by others may be unlocked (and destructed) by others, so
     // we must lock again to safely dereference e after CheckQuickAbortIsReasonable().
     e->lock("storeUnregister");
 
     if (CheckQuickAbortIsReasonable(e))
         e->abort();
     else
         mem->kickReads();
 
 #if USE_ADAPTATION
     e->kickProducer();
 #endif
 
     e->unlock("storeUnregister");
     return 1;
 }
 
 /* Call handlers waiting for  data to be appended to E. */
 void
 StoreEntry::invokeHandlers()
 {
+    if (EBIT_TEST(flags, DELAY_SENDING)) {
+        debugs(90, 3, "DELAY_SENDING is on, exiting " << *this);
+        return;
+    }
+    if (EBIT_TEST(flags, ENTRY_FWD_HDR_WAIT)) {
+        debugs(90, 3, "ENTRY_FWD_HDR_WAIT is on, exiting " << *this);
+        return;
+    }
+
     /* Commit what we can to disk, if appropriate */
     swapOut();
     int i = 0;
     store_client *sc;
     dlink_node *nx = NULL;
     dlink_node *node;
 
     PROF_start(InvokeHandlers);
 
     debugs(90, 3, "InvokeHandlers: " << getMD5Text()  );
     /* walk the entire list looking for valid callbacks */
 
     for (node = mem_obj->clients.head; node; node = nx) {
         sc = (store_client *)node->data;
         nx = node->next;
         debugs(90, 3, "StoreEntry::InvokeHandlers: checking client #" << i  );
         ++i;
 
         if (!sc->_callback.pending())
             continue;
 
         if (sc->flags.disk_io_pending)
             continue;
 
         storeClientCopy2(this, sc);
     }
     PROF_stop(InvokeHandlers);
 }
 
 // Does not account for remote readers/clients.

_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reply via email to