Hi all,
I am attaching two patches for this bug. One simple for squid-3.5 (t1 patch) and one more complex (t2 patch). The simple patch solve the bug for now, but may leave other similar bugs in squid.

Bug description:
   - The client side and server side are finished
   - On server side the Ftp::Relay::finalizeDataDownload() is called and
     schedules the Ftp::Server::originDataCompletionCheckpoint
   - On client side the "Ftp::Server::userDataCompletionCheckpoint" is
     called. This is schedules a write to control connection and closes
     data connection.
   - The Ftp::Server::originDataCompletionCheckpoint is called which is
     trying to write to control connection and the assertion triggered.

This bug is an corner case, where the client-side (FTP::Server) should wait for the server side (Ftp::Client/Ftp::Relay) to finish its job before respond to the FTP client. In this bug the existing mechanism, designed to handle such problems, did not worked correctly and resulted to a double writing to the client.

To understand why it is important such mechanism, imagine the case where an FTP client download a huge file and the ICAP server
block this file. In this case Squid needs to delay response to the FTP
client until the squid server-side closes its data connection too and
get the response from FTP server. This is required because we can not accept more commands from FTP client, if we can not forward them to the FTP server.

Moreover Squid can fall into such cases when a download or upload may aborted abnormally by one of the client or server. The problem can become more complicated if ICAP/adaptation is used and squid respond to a download request without contacting the FTP server.

This patch try to fix the existing mechanism as follows:
- When Ftp::Server receives a "startWaitingForOrigin" callback, postpones writting possible responses to the client and keeps waiting for the stopWaitingForOrigin callback

- When the Ftp::Server receives a "stopWaitingForOrigin" callback, resumes any postponed response.

- When the Ftp::Client starts working on a DATA-related transaction calls the Ftp::Server::startWaitingForOrigin callback

- When the Ftp::Client finishes its job or when its abort abnormaly, checks whether it needs to call Ftp::Server::stopWaitingForOrigin callback.

- Also this patch try to fix the status code returned to the FTP client taking in account the status code returned by FTP server. The "Ftp::Server::stopWaitingForOrigin" is used to pass the returned status code to the client side.

This is a Measurement Factory project


assertion failed: Write.cc:41: "!ccb->active()"

Bug description:
   - The client side and server side are finished
   - On server side the Ftp::Relay::finalizeDataDownload() is called and
     schedules the Ftp::Server::originDataCompletionCheckpoint
   - On client side the "Ftp::Server::userDataCompletionCheckpoint" is
     called. This is schedules a write to control connection and closes
     data connection.
   - The Ftp::Server::originDataCompletionCheckpoint is called which is
     trying to write to control connection and the assertion triggered.

This patch:
  - Sets the Ftp::Server::master::waitForOriginData flag to false inside 
    Ftp::Server::originDataCompletionCheckpoint(), justs before writes the
    reply message to the client, instead of setting it inside
    Ftp::Relay::finalizeDataDownload(), to avoid call twice the
    Ftp::Server::completeDataExchange method id both
    originDataCompletionCheckpoint and userDataCompletionCheckpoint called
    concurrently
  - Does not sets the Ftp::Server::master->waitForOriginData to false inside
    Ftp::Server::userDataCompletionCheckpoint method if the server side
    connection is closed, to avoid crash in the case the server side
    schedules the originDataCompletionCheckpoint before closed the server
    side connection.
    In this case squid will close the client-side connection too, because
    it is pinned with the server-side connection. We do not have to
    worry for no answer to the ftp-client.

This is a Measurement Factory project

=== modified file 'src/clients/FtpRelay.cc'
--- src/clients/FtpRelay.cc	2016-01-31 06:14:05 +0000
+++ src/clients/FtpRelay.cc	2016-03-10 18:17:31 +0000
@@ -699,42 +699,40 @@
     }
 
     debugs(9, 2, "connected FTP server data channel: " << io.conn);
 
     data.opened(io.conn, dataCloser());
 
     sendCommand();
 }
 
 void
 Ftp::Relay::scheduleReadControlReply()
 {
     Ftp::Client::scheduleReadControlReply(0);
 }
 
 void
 Ftp::Relay::finalizeDataDownload()
 {
     debugs(9, 2, "Complete data downloading/Uploading");
 
-    updateMaster().waitForOriginData = false;
-
     CbcPointer<ConnStateData> &mgr = fwd->request->clientConnectionManager;
     if (mgr.valid()) {
         if (Ftp::Server *srv = dynamic_cast<Ftp::Server*>(mgr.get())) {
             typedef NullaryMemFunT<Ftp::Server> CbDialer;
             AsyncCall::Pointer call = JobCallback(11, 3, CbDialer, srv,
                                                   Ftp::Server::originDataCompletionCheckpoint);
             ScheduleCallHere(call);
         }
     }
     serverComplete();
 }
 
 bool
 Ftp::Relay::abortOnData(const char *reason)
 {
     debugs(9, 3, "aborting transaction for " << reason <<
            "; FD " << (ctrl.conn != NULL ? ctrl.conn->fd : -1) << ", Data FD " << (data.conn != NULL ? data.conn->fd : -1) << ", this " << this);
     // this method is only called to handle data connection problems
     // the control connection should keep going
 

=== modified file 'src/servers/FtpServer.cc'
--- src/servers/FtpServer.cc	2016-01-31 06:14:05 +0000
+++ src/servers/FtpServer.cc	2016-03-10 18:17:31 +0000
@@ -1690,60 +1690,57 @@
     reqFlags.cachable = false; // force releaseRequest() in storeCreateEntry()
     reqFlags.noCache = true;
     repContext->createStoreEntry(http->request->method, reqFlags);
     http->storeEntry()->replaceHttpReply(reply);
 }
 
 void
 Ftp::Server::callException(const std::exception &e)
 {
     debugs(33, 2, "FTP::Server job caught: " << e.what());
     closeDataConnection();
     unpinConnection(true);
     if (Comm::IsConnOpen(clientConnection))
         clientConnection->close();
     AsyncJob::callException(e);
 }
 
 void
 Ftp::Server::originDataCompletionCheckpoint()
 {
+    master->waitForOriginData = false;
     if (!master->userDataDone) {
         debugs(33, 5, "Transfering from/to client not finished yet");
         return;
     }
 
     completeDataExchange();
 }
 
 void Ftp::Server::userDataCompletionCheckpoint(int finalStatusCode)
 {
     Must(!master->userDataDone);
     master->userDataDone = finalStatusCode;
 
     if (in.bodyParser)
         finishDechunkingRequest(false);
 
-    // The origin control connection is gone, nothing to wait for
-    if (!Comm::IsConnOpen(pinning.serverConnection))
-        master->waitForOriginData = false;
-
     if (master->waitForOriginData) {
         // The completeDataExchange() is not called here unconditionally
         // because we want to signal the FTP user that we are not fully
         // done processing its data stream, even though all data bytes
         // have been sent or received already.
         debugs(33, 5, "Transfering from/to FTP server is not complete");
         return;
     }
 
     completeDataExchange();
 }
 
 void Ftp::Server::completeDataExchange()
 {
     writeCustomReply(master->userDataDone, master->userDataDone == 226 ? "Transfer complete" : "Server error; transfer aborted");
     closeDataConnection();
 }
 
 /// Whether Squid FTP Relay supports a named feature (e.g., a command).
 static bool

assertion failed: Write.cc:41: "!ccb->active()"

Bug description:
   - The client side and server side are finished
   - On server side the Ftp::Relay::finalizeDataDownload() is called and
     schedules the Ftp::Server::originDataCompletionCheckpoint
   - On client side the "Ftp::Server::userDataCompletionCheckpoint" is
     called. This is schedules a write to control connection and closes
     data connection.
   - The Ftp::Server::originDataCompletionCheckpoint is called which is
     trying to write to control connection and the assertion triggered.

This bug is an corner case, where the client-side  (FTP::Server) should
wait for the server side (Ftp::Client/Ftp::Relay) to finish its job before
respond to the FTP client. In this bug the existing mechanism, designed
to handle such problems, did not worked correctly and resulted to a double
write response to the client.

This patch try to fix the existing mechanism as follows:

- When Ftp::Server receives a "startWaitingForOrigin" callback, postpones
  writting possible responses to the client and keeps waiting for the
  stopWaitingForOrigin callback

- When the Ftp::Server receives a "stopWaitingForOrigin" callback,
  resumes any postponed response.
 
- When the Ftp::Client starts working on a DATA-related transaction, calls the
  Ftp::Server::startWaitingForOrigin callback

- When the Ftp::Client finishes its job or when its abort abnormaly, checks
  whether it needs to call Ftp::Server::stopWaitingForOrigin callback.

- Also this patch try to fix the status code returned to the FTP client
  taking in account the status code returned by FTP server. The
  "Ftp::Server::stopWaitingForOrigin" is used to pass the returned status code
  to the client side.

This is a Measurement Factory project

=== modified file 'src/clients/FtpRelay.cc'
--- src/clients/FtpRelay.cc	2016-02-23 08:51:22 +0000
+++ src/clients/FtpRelay.cc	2016-03-10 18:09:26 +0000
@@ -42,76 +42,77 @@
     const Ftp::MasterState &master() const;
     Ftp::MasterState &updateMaster();
     Ftp::ServerState serverState() const { return master().serverState; }
     void serverState(const Ftp::ServerState newState);
 
     /* Ftp::Client API */
     virtual void failed(err_type error = ERR_NONE, int xerrno = 0);
     virtual void dataChannelConnected(const CommConnectCbParams &io);
 
     /* Client API */
     virtual void serverComplete();
     virtual void handleControlReply();
     virtual void processReplyBody();
     virtual void handleRequestBodyProducerAborted();
     virtual bool mayReadVirginReplyBody() const;
     virtual void completeForwarding();
     virtual bool abortOnData(const char *reason);
 
     /* AsyncJob API */
     virtual void start();
+    virtual void swanSong();
 
     void forwardReply();
     void forwardError(err_type error = ERR_NONE, int xerrno = 0);
     void failedErrorMessage(err_type error, int xerrno);
     HttpReply *createHttpReply(const Http::StatusCode httpStatus, const int64_t clen = 0);
     void handleDataRequest();
     void startDataDownload();
     void startDataUpload();
     bool startDirTracking();
     void stopDirTracking();
     bool weAreTrackingDir() const {return savedReply.message != NULL;}
 
     typedef void (Relay::*PreliminaryCb)();
     void forwardPreliminaryReply(const PreliminaryCb cb);
     void proceedAfterPreliminaryReply();
     PreliminaryCb thePreliminaryCb;
 
     typedef void (Relay::*SM_FUNC)();
     static const SM_FUNC SM_FUNCS[];
     void readGreeting();
     void sendCommand();
     void readReply();
     void readFeatReply();
     void readPasvReply();
     void readDataReply();
     void readTransferDoneReply();
     void readEpsvReply();
     void readCwdOrCdupReply();
     void readUserOrPassReply();
 
     void scheduleReadControlReply();
-    void finalizeDataDownload();
 
     static void abort(void *d); // TODO: Capitalize this and FwdState::abort().
 
     bool forwardingCompleted; ///< completeForwarding() has been called
+    bool clientSideWaitingForUs; ///< whether the client is waiting for us
 
     struct {
         wordlist *message; ///< reply message, one  wordlist entry per message line
         char *lastCommand; ///< the command caused the reply
         char *lastReply; ///< last line of reply: reply status plus message
         int replyCode; ///< the reply status
     } savedReply; ///< set and delayed while we are tracking using PWD
 };
 
 } // namespace Ftp
 
 CBDATA_NAMESPACED_CLASS_INIT(Ftp, Relay);
 
 const Ftp::Relay::SM_FUNC Ftp::Relay::SM_FUNCS[] = {
     &Ftp::Relay::readGreeting, // BEGIN
     &Ftp::Relay::readUserOrPassReply, // SENT_USER
     &Ftp::Relay::readUserOrPassReply, // SENT_PASS
     NULL,/* &Ftp::Relay::readReply */ // SENT_TYPE
     NULL,/* &Ftp::Relay::readReply */ // SENT_MDTM
     NULL,/* &Ftp::Relay::readReply */ // SENT_SIZE
@@ -126,83 +127,111 @@
     NULL,/* &Ftp::Relay::readDataReply, */ // SENT_NLST
     NULL,/* &Ftp::Relay::readReply */ // SENT_REST
     NULL,/* &Ftp::Relay::readDataReply */ // SENT_RETR
     NULL,/* &Ftp::Relay::readReply */ // SENT_STOR
     NULL,/* &Ftp::Relay::readReply */ // SENT_QUIT
     &Ftp::Relay::readTransferDoneReply, // READING_DATA
     &Ftp::Relay::readReply, // WRITING_DATA
     NULL,/* &Ftp::Relay::readReply */ // SENT_MKDIR
     &Ftp::Relay::readFeatReply, // SENT_FEAT
     NULL,/* &Ftp::Relay::readPwdReply */ // SENT_PWD
     &Ftp::Relay::readCwdOrCdupReply, // SENT_CDUP
     &Ftp::Relay::readDataReply,// SENT_DATA_REQUEST
     &Ftp::Relay::readReply, // SENT_COMMAND
     NULL
 };
 
 Ftp::Relay::Relay(FwdState *const fwdState):
     AsyncJob("Ftp::Relay"),
     Ftp::Client(fwdState),
     thePreliminaryCb(NULL),
-    forwardingCompleted(false)
+    forwardingCompleted(false),
+    clientSideWaitingForUs(false)
 {
     savedReply.message = NULL;
     savedReply.lastCommand = NULL;
     savedReply.lastReply = NULL;
     savedReply.replyCode = 0;
 
     // Nothing we can do at request creation time can mark the response as
     // uncachable, unfortunately. This prevents "found KEY_PRIVATE" WARNINGs.
     entry->releaseRequest();
     // TODO: Convert registerAbort() to use AsyncCall
     entry->registerAbort(Ftp::Relay::abort, this);
 }
 
 Ftp::Relay::~Relay()
 {
     closeServer(); // TODO: move to clients/Client.cc?
     if (savedReply.message)
         wordlistDestroy(&savedReply.message);
 
     xfree(savedReply.lastCommand);
     xfree(savedReply.lastReply);
 }
 
 void
 Ftp::Relay::start()
 {
     if (!master().clientReadGreeting)
         Ftp::Client::start();
     else if (serverState() == fssHandleDataRequest ||
              serverState() == fssHandleUploadRequest)
         handleDataRequest();
     else
         sendCommand();
 }
 
+void
+Ftp::Relay::swanSong()
+{
+    if (clientSideWaitingForUs) {
+        CbcPointer<ConnStateData> &mgr = fwd->request->clientConnectionManager;
+        if (mgr.valid()) {
+            if (Ftp::Server *srv = dynamic_cast<Ftp::Server*>(mgr.get())) {
+                typedef UnaryMemFunT<Ftp::Server, int> CbDialer;
+                AsyncCall::Pointer call = asyncCall(11, 3, "Ftp::Server::stopWaitingForOrigin",
+                                                    CbDialer(srv, &Ftp::Server::stopWaitingForOrigin, 0));
+                ScheduleCallHere(call);
+            }
+        }
+    }
+
+    Ftp::Client::swanSong();
+}
+
 /// Keep control connection for future requests, after we are done with it.
 /// Similar to COMPLETE_PERSISTENT_MSG handling in http.cc.
 void
 Ftp::Relay::serverComplete()
 {
     CbcPointer<ConnStateData> &mgr = fwd->request->clientConnectionManager;
     if (mgr.valid()) {
+        if (clientSideWaitingForUs) {
+            if (Ftp::Server *srv = dynamic_cast<Ftp::Server*>(mgr.get())) {
+               typedef UnaryMemFunT<Ftp::Server, int> CbDialer;
+               AsyncCall::Pointer call = asyncCall(11, 3, "Ftp::Server::stopWaitingForOrigin",
+                                                   CbDialer(srv, &Ftp::Server::stopWaitingForOrigin, ctrl.replycode));
+               ScheduleCallHere(call);
+               clientSideWaitingForUs = false;
+            }
+        }
         if (Comm::IsConnOpen(ctrl.conn)) {
             debugs(9, 7, "completing FTP server " << ctrl.conn <<
                    " after " << ctrl.replycode);
             fwd->unregister(ctrl.conn);
             if (ctrl.replycode == 221) { // Server sends FTP 221 before closing
                 mgr->unpinConnection(false);
                 ctrl.close();
             } else {
                 mgr->pinConnection(ctrl.conn, fwd->request,
                                    ctrl.conn->getPeer(),
                                    fwd->request->flags.connectionAuth);
                 ctrl.forget();
             }
         }
     }
     Ftp::Client::serverComplete();
 }
 
 /// Safely returns the master state,
 /// with safety checks in case the Ftp::Server side of the master xact is gone.
@@ -514,40 +543,53 @@
         return;
     }
 
     SBuf buf;
     if (params.size() > 0)
         buf.Printf("%s %s%s", cmd.termedBuf(), params.termedBuf(), Ftp::crlf);
     else
         buf.Printf("%s%s", cmd.termedBuf(), Ftp::crlf);
 
     writeCommand(buf.c_str());
 
     state =
         serverState() == fssHandleCdup ? SENT_CDUP :
         serverState() == fssHandleCwd ? SENT_CWD :
         serverState() == fssHandleFeat ? SENT_FEAT :
         serverState() == fssHandleDataRequest ? SENT_DATA_REQUEST :
         serverState() == fssHandleUploadRequest ? SENT_DATA_REQUEST :
         serverState() == fssConnected ? SENT_USER :
         serverState() == fssHandlePass ? SENT_PASS :
         SENT_COMMAND;
+
+    if (state == SENT_DATA_REQUEST) {
+        CbcPointer<ConnStateData> &mgr = fwd->request->clientConnectionManager;
+        if (mgr.valid()) {
+            if (Ftp::Server *srv = dynamic_cast<Ftp::Server*>(mgr.get())) {
+                typedef NullaryMemFunT<Ftp::Server> CbDialer;
+                AsyncCall::Pointer call = JobCallback(11, 3, CbDialer, srv,
+                                                      Ftp::Server::startWaitingForOrigin);
+                ScheduleCallHere(call);
+                clientSideWaitingForUs = true;
+            }
+        }
+    }
 }
 
 void
 Ftp::Relay::readReply()
 {
     assert(serverState() == fssConnected ||
            serverState() == fssHandleUploadRequest);
 
     if (100 <= ctrl.replycode && ctrl.replycode < 200)
         forwardPreliminaryReply(&Ftp::Relay::scheduleReadControlReply);
     else
         forwardReply();
 }
 
 void
 Ftp::Relay::readFeatReply()
 {
     assert(serverState() == fssHandleFeat);
 
     if (100 <= ctrl.replycode && ctrl.replycode < 200)
@@ -672,87 +714,70 @@
     if (weAreTrackingDir()) { // we are tracking
         stopDirTracking(); // and forward the delayed response below
     } else if (ctrl.replycode == 230) { // successful login
         if (startDirTracking())
             return;
     }
 
     forwardReply();
 }
 
 void
 Ftp::Relay::readTransferDoneReply()
 {
     debugs(9, 3, status());
 
     if (ctrl.replycode != 226 && ctrl.replycode != 250) {
         debugs(9, DBG_IMPORTANT, "got FTP code " << ctrl.replycode <<
                " after reading response data");
     }
 
-    finalizeDataDownload();
+    debugs(9, 2, "Complete data downloading/Uploading");
+
+    serverComplete();
 }
 
 void
 Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
 {
     debugs(9, 3, status());
     data.opener = NULL;
 
     if (io.flag != Comm::OK) {
         debugs(9, 2, "failed to connect FTP server data channel");
         forwardError(ERR_CONNECT_FAIL, io.xerrno);
         return;
     }
 
     debugs(9, 2, "connected FTP server data channel: " << io.conn);
 
     data.opened(io.conn, dataCloser());
 
     sendCommand();
 }
 
 void
 Ftp::Relay::scheduleReadControlReply()
 {
     Ftp::Client::scheduleReadControlReply(0);
 }
 
-void
-Ftp::Relay::finalizeDataDownload()
-{
-    debugs(9, 2, "Complete data downloading/Uploading");
-
-    updateMaster().waitForOriginData = false;
-
-    CbcPointer<ConnStateData> &mgr = fwd->request->clientConnectionManager;
-    if (mgr.valid()) {
-        if (Ftp::Server *srv = dynamic_cast<Ftp::Server*>(mgr.get())) {
-            typedef NullaryMemFunT<Ftp::Server> CbDialer;
-            AsyncCall::Pointer call = JobCallback(11, 3, CbDialer, srv,
-                                                  Ftp::Server::originDataCompletionCheckpoint);
-            ScheduleCallHere(call);
-        }
-    }
-    serverComplete();
-}
-
 bool
 Ftp::Relay::abortOnData(const char *reason)
 {
     debugs(9, 3, "aborting transaction for " << reason <<
            "; FD " << (ctrl.conn != NULL ? ctrl.conn->fd : -1) << ", Data FD " << (data.conn != NULL ? data.conn->fd : -1) << ", this " << this);
     // this method is only called to handle data connection problems
     // the control connection should keep going
 
 #if USE_ADAPTATION
     if (adaptedBodySource != NULL)
         stopConsumingFrom(adaptedBodySource);
 #endif
 
     if (Comm::IsConnOpen(data.conn))
         dataComplete();
 
     return !Comm::IsConnOpen(ctrl.conn);
 }
 
 void

=== modified file 'src/servers/FtpServer.cc'
--- src/servers/FtpServer.cc	2016-01-31 12:05:30 +0000
+++ src/servers/FtpServer.cc	2016-03-10 17:36:20 +0000
@@ -45,41 +45,42 @@
 namespace Ftp
 {
 static void PrintReply(MemBuf &mb, const HttpReply *reply, const char *const prefix = "");
 static bool SupportedCommand(const SBuf &name);
 static bool CommandHasPathParameter(const SBuf &cmd);
 };
 
 Ftp::Server::Server(const MasterXaction::Pointer &xact):
     AsyncJob("Ftp::Server"),
     ConnStateData(xact),
     master(new MasterState),
     uri(),
     host(),
     gotEpsvAll(false),
     onDataAcceptCall(),
     dataListenConn(),
     dataConn(),
     uploadAvailSize(0),
     listener(),
     connector(),
-    reader()
+    reader(),
+    originDataDownloadAbortedOnError(false)
 {
     flags.readMore = false; // we need to announce ourselves first
     *uploadBuf = 0;
 }
 
 Ftp::Server::~Server()
 {
     closeDataConnection();
 }
 
 int
 Ftp::Server::pipelinePrefetchMax() const
 {
     return 0; // no support for concurrent FTP requests
 }
 
 time_t
 Ftp::Server::idleTimeout() const
 {
     return Config.Timeout.ftpClientIdle;
@@ -1017,40 +1018,45 @@
         userDataCompletionCheckpoint(451);
         debugs(33, 3, "FTP reply data transfer failed: STREAM_FAILED");
         break;
     default:
         fatal("unreachable code");
     }
 }
 
 void
 Ftp::Server::handleUploadReply(const HttpReply *reply, StoreIOBuffer)
 {
     writeForwardedReply(reply);
     // note that the client data connection may already be closed by now
 }
 
 void
 Ftp::Server::writeForwardedReply(const HttpReply *reply)
 {
     Must(reply);
 
+    if (master->waitingForOrigin) {
+        delayedReply = reply;
+        return;
+    }
+
     const HttpHeader &header = reply->header;
     // adaptation and forwarding errors lack Http::HdrType::FTP_STATUS
     if (!header.has(Http::HdrType::FTP_STATUS)) {
         writeForwardedForeign(reply); // will get to Ftp::Server::wroteReply
         return;
     }
 
     typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteReply);
     writeForwardedReplyAndCall(reply, call);
 }
 
 void
 Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
 {
     if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV (converted from EPRT)", reply);
         return;
     }
 
@@ -1471,57 +1477,54 @@
     Ip::Address cltAddr;
     if (!Ftp::ParseIpPort(params.termedBuf(), NULL, cltAddr)) {
         setReply(501, "Invalid parameter");
         return false;
     }
 
     if (!createDataConnection(cltAddr))
         return false;
 
     changeState(fssHandlePort, "handlePortRequest");
     setDataCommand();
     return true; // forward our fake PASV request
 }
 
 bool
 Ftp::Server::handleDataRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
-    master->waitForOriginData = true;
     master->userDataDone = 0;
+    originDataDownloadAbortedOnError = false;
 
     changeState(fssHandleDataRequest, "handleDataRequest");
 
     return true;
 }
 
 bool
 Ftp::Server::handleUploadRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
-    master->waitForOriginData = true;
-    master->userDataDone = 0;
-
     if (Config.accessList.forceRequestBodyContinuation) {
         ClientHttpRequest *http = pipeline.front()->http;
         HttpRequest *request = http->request;
         ACLFilledChecklist bodyContinuationCheck(Config.accessList.forceRequestBodyContinuation, request, NULL);
         if (bodyContinuationCheck.fastCheck() == ACCESS_ALLOWED) {
             request->forcedBodyContinuation = true;
             if (checkDataConnPost()) {
                 // Write control Msg
                 writeEarlyReply(150, "Data connection opened");
                 maybeReadUploadData();
             } else {
                 // wait for acceptDataConnection but tell it to call wroteEarlyReply
                 // after writing "150 Data connection opened"
                 typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
                 AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteEarlyReply);
                 onDataAcceptCall = call;
             }
         }
     }
 
@@ -1707,76 +1710,108 @@
 
     RequestFlags reqFlags;
     reqFlags.cachable = false; // force releaseRequest() in storeCreateEntry()
     reqFlags.noCache = true;
     repContext->createStoreEntry(http->request->method, reqFlags);
     http->storeEntry()->replaceHttpReply(reply);
 }
 
 void
 Ftp::Server::callException(const std::exception &e)
 {
     debugs(33, 2, "FTP::Server job caught: " << e.what());
     closeDataConnection();
     unpinConnection(true);
     if (Comm::IsConnOpen(clientConnection))
         clientConnection->close();
     AsyncJob::callException(e);
 }
 
 void
-Ftp::Server::originDataCompletionCheckpoint()
+Ftp::Server::startWaitingForOrigin()
 {
-    if (!master->userDataDone) {
-        debugs(33, 5, "Transfering from/to client not finished yet");
-        return;
-    }
+    debugs(33, 5, "Transfering data in progress, waitting server side to finish");
+    master->waitingForOrigin = true;
+}
 
-    completeDataExchange();
+void
+Ftp::Server::stopWaitingForOrigin(int originStatus)
+{
+    Must(master->waitingForOrigin);
+    // Currently only data upload and data download requests are 
+    // waiting for the origin:
+    // Must(master->serverState  == fssHandleUploadRequest ||
+    //      master->serverState == fssHandleDataRequest);
+
+    master->waitingForOrigin = false;
+
+    if (master->serverState == fssHandleDataRequest) {
+        if (!master->userDataDone) {
+            debugs(33, 5, "Transfering to client not finished yet");
+            // If the server side aborted with an error before we are done,
+            // we need to record it,  in order to inform client about
+            // an error while the data downloaded from FTP server.
+            //
+            // In the case we are already done with download (eg because of
+            // adaptation or an error) the most possible is that the
+            // server-side aborted the download because of us.
+            // In this case it is better to trust the status code stored
+            // in master->userDataDone.
+            originDataDownloadAbortedOnError = (originStatus > 400);
+            return;
+        }
+
+        completeDataExchange();
+    } else {
+        if (delayedReply != NULL) {
+            writeForwardedReply(delayedReply.getRaw());
+            delayedReply = NULL;
+        }
+    }
 }
 
 void Ftp::Server::userDataCompletionCheckpoint(int finalStatusCode)
 {
     Must(!master->userDataDone);
     master->userDataDone = finalStatusCode;
 
     if (bodyParser)
         finishDechunkingRequest(false);
 
-    // The origin control connection is gone, nothing to wait for
-    if (!Comm::IsConnOpen(pinning.serverConnection))
-        master->waitForOriginData = false;
-
-    if (master->waitForOriginData) {
+    if (master->waitingForOrigin) {
         // The completeDataExchange() is not called here unconditionally
         // because we want to signal the FTP user that we are not fully
         // done processing its data stream, even though all data bytes
         // have been sent or received already.
-        debugs(33, 5, "Transfering from/to FTP server is not complete");
+        debugs(33, 5, "Transfering from FTP server is not complete");
         return;
     }
 
     completeDataExchange();
 }
 
 void Ftp::Server::completeDataExchange()
 {
+    //Adjust our reply if the server responded with an error:
+    if (master->userDataDone == 226 && originDataDownloadAbortedOnError)
+        master->userDataDone = 451;
+
     writeCustomReply(master->userDataDone, master->userDataDone == 226 ? "Transfer complete" : "Server error; transfer aborted");
     closeDataConnection();
 }
 
 /// Whether Squid FTP Relay supports a named feature (e.g., a command).
 static bool
 Ftp::SupportedCommand(const SBuf &name)
 {
     static std::set<SBuf> BlackList;
     if (BlackList.empty()) {
         /* Add FTP commands that Squid cannot relay correctly. */
 
         // We probably do not support AUTH TLS.* and AUTH SSL,
         // but let's disclaim all AUTH support to KISS, for now.
         BlackList.insert(cmdAuth());
     }
 
     // we claim support for all commands that we do not know about
     return BlackList.find(name) == BlackList.end();
 }

=== modified file 'src/servers/FtpServer.h'
--- src/servers/FtpServer.h	2016-01-31 12:05:30 +0000
+++ src/servers/FtpServer.h	2016-03-10 17:24:32 +0000
@@ -24,69 +24,74 @@
     fssHandlePasv,
     fssHandlePort,
     fssHandleDataRequest,
     fssHandleUploadRequest,
     fssHandleEprt,
     fssHandleEpsv,
     fssHandleCwd,
     fssHandlePass,
     fssHandleCdup,
     fssError
 } ServerState;
 
 // TODO: This should become a part of MasterXaction when we start sending
 // master transactions to the clients/ code.
 /// Transaction information shared among our FTP client and server jobs.
 class MasterState: public RefCountable
 {
 public:
     typedef RefCount<MasterState> Pointer;
 
-    MasterState(): serverState(fssBegin), clientReadGreeting(false), userDataDone(0), waitForOriginData(false) {}
+MasterState(): serverState(fssBegin), clientReadGreeting(false), userDataDone(false), waitingForOrigin(false) {}
 
     Ip::Address clientDataAddr; ///< address of our FTP client data connection
     SBuf workingDir; ///< estimated current working directory for URI formation
     ServerState serverState; ///< what our FTP server is doing
     bool clientReadGreeting; ///< whether our FTP client read their FTP server greeting
     /// Squid will send or has sent this final status code to the FTP client
     int userDataDone;
-    /// whether the transfer on the Squid-origin data connection is not over yet
-    bool waitForOriginData;
+    /// whether we have to wait the transfer on the Squid-origin data connection
+    /// to be finished
+    bool waitingForOrigin;
 };
 
 /// Manages a control connection from an FTP client.
 class Server: public ConnStateData
 {
     CBDATA_CLASS(Server);
     // XXX CBDATA_CLASS expands to nonvirtual toCbdata, AsyncJob::toCbdata
     //     is pure virtual. breaks build on clang if override is used
 
 public:
     explicit Server(const MasterXaction::Pointer &xact);
     virtual ~Server();
     /* AsyncJob API */
     virtual void callException(const std::exception &e);
 
+    /// Called by Ftp::Client class when it is start receiving or
+    /// sending data.
+    void startWaitingForOrigin();
+
     /// Called by Ftp::Client class when it is done receiving or
     /// sending data. Waits for both agents to be done before
     /// responding to the FTP client and closing the data connection.
-    void originDataCompletionCheckpoint();
+    void stopWaitingForOrigin(int status);
 
     // This is a pointer in hope to minimize future changes when MasterState
     // becomes a part of MasterXaction. Guaranteed not to be nil.
     MasterState::Pointer master; ///< info shared among our FTP client and server jobs
 
 protected:
     friend void StartListening();
 
     // errors detected before it is possible to create an HTTP request wrapper
     enum class EarlyErrorKind {
         HugeRequest,
         MissingLogin,
         MissingUsername,
         MissingHost,
         UnsupportedCommand,
         InvalidUri,
         MalformedCommand
     };
 
     /* ConnStateData API */
@@ -172,26 +177,33 @@
     void handleEprtReply(const HttpReply *header, StoreIOBuffer receivedData);
     void handleEpsvReply(const HttpReply *header, StoreIOBuffer receivedData);
 
 private:
     void doProcessRequest();
     void shovelUploadData();
     void resetLogin(const char *reason);
 
     SBuf uri; ///< a URI reconstructed from various FTP message details
     SBuf host; ///< intended dest. of a transparently intercepted FTP conn
     bool gotEpsvAll; ///< restrict data conn setup commands to just EPSV
     AsyncCall::Pointer onDataAcceptCall; ///< who to call upon data conn acceptance
     Comm::ConnectionPointer dataListenConn; ///< data connection listening socket
     Comm::ConnectionPointer dataConn; ///< data connection
     char uploadBuf[CLIENT_REQ_BUF_SZ]; ///< data connection input buffer
     size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes
 
     AsyncCall::Pointer listener; ///< set when we are passively listening
     AsyncCall::Pointer connector; ///< set when we are actively connecting
     AsyncCall::Pointer reader; ///< set when we are reading FTP data
+
+    /// whether the transfer aborted with an error on server side
+    bool originDataDownloadAbortedOnError;
+    
+    /// Set when the HttpReply can not be forwarded right now to the client 
+    /// and must be delayed unless the server side is done.
+    HttpReply::Pointer delayedReply;
 };
 
 } // namespace Ftp
 
 #endif /* SQUID_SERVERS_FTP_SERVER_H */
 

_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reply via email to