I've managed to reproduce this issue when many hosts hit apt-cacher-ng at the same time.
Attached a patch which fixes it for me - this is a quick and hacky patch! Sent debug logs of a run that reproduces this problem and a run with this patch applied directly to the maintainer.
diff --git a/CMakeLists.txt b/CMakeLists.txt index 024f6a0..011ab42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,7 @@ cmake_minimum_required(VERSION 3.1) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + # try to set the best C++ language level set(CMAKE_CXX_STANDARD 20) # let it take the lowest version, we need some precursor of C++14x diff --git a/src/job.cc b/src/job.cc index a2025cc..5c003a9 100644 --- a/src/job.cc +++ b/src/job.cc @@ -662,6 +662,18 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname else m_sFileLoc=theUrl.sHost+theUrl.sPath; + // Here we serialize multiple clients trying to download the + // same file. Only one thread at a time per URL is allowed to + // proceed further in this function. + Lockstuff g{h.getRequestUrl()}; + + // Check if another job is running. If so link to that. + if(g.stuff->otherThread) { + m_pItem = m_pParentCon.GetItemRegistry()->Create(m_sFileLoc, ESharingHow::ALWAYS_TRY_SHARING, fileitem::tSpecialPurposeAttr{}); + USRDBG("Linked to other job"); + return; + } + fileitem::tSpecialPurposeAttr attr { ! cfg::offlinemode && data_type == FILE_VOLATILE, m_bIsHeadOnly, @@ -697,8 +709,14 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname if(cfg::trackfileuse && fistate >= fileitem::FIST_DLGOTHEAD && fistate < fileitem::FIST_DLERROR) m_pItem.get()->UpdateHeadTimestamp(); - if(fistate==fileitem::FIST_COMPLETE) + if(fistate==fileitem::FIST_COMPLETE) { + // Tell everybody waiting for this thread to complete + // where to get their m_pItem and register a cleanup + // when this job completes. + g.setReturnValue(m_pItem.get()); + m_ipc = std::make_unique<inProgressCleanup>(h.getRequestUrl()); return; // perfect, done here + } if(cfg::offlinemode) { // make sure there will be no problems later in SendData or prepare a user message // error or needs download but freshness check was disabled, so it's really not complete. @@ -759,6 +777,11 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname USRERR("PANIC! Error creating download job for " << m_sFileLoc); return report_overload(__LINE__); } + // Tell everybody waiting for this thread to complete + // where to get their m_pItem and register a cleanup + // when this job completes. + g.setReturnValue(m_pItem.get()); + m_ipc = std::make_unique<inProgressCleanup>(h.getRequestUrl()); } } catch (const std::bad_alloc&) // OOM, may this ever happen here? @@ -1190,4 +1213,58 @@ void job::AppendMetaHeaders() << "\r\nServer: Debian Apt-Cacher NG/" ACVERSION "\r\n" "\r\n"; } + +job::Lockstuff::Lockstuff(const std::string& url_): url(url_) { + lockuniq g{inProgressLock}; + LOGSTARTFUNC; + while(true) { + auto [it, ins] = inProgress.insert({url, nullptr}); + if(!ins) { + stuff = it->second; + if (stuff->otherThread) { + break; + } + // Someone is already downloading this. Add ourselves to the waiters. + stuff->cv.wait(g._guard); + } else { + stuff = it->second = std::make_shared<Stuff>(); + owner = true; + break; + } + } +} + +void job::Lockstuff::setReturnValue(tFileItemPtr tfip) { + LOGSTARTFUNC; + if (const auto& it = inProgress.find(url); it != inProgress.end()) { + stuff->otherThread = tfip; + } +} + +job::Lockstuff::~Lockstuff() { + lockuniq g{inProgressLock}; + LOGSTARTFUNC; + if(owner) { + stuff->cv.notify_all(); + // After notify_all, any waiting threads are guaranteed to + // be blocked on inProgressLock, not on the condition so + // it's safe to delete it. However we have to use shared + // pointers because we don't know how long it will take the + // waiters to read the tFileItemPtr; + if (!stuff->otherThread) { + inProgress.erase(url); + } + } +} + +job::inProgressCleanup::~inProgressCleanup() { + lockuniq g{inProgressLock}; + LOGSTARTFUNC; + if (const auto& it = inProgress.find(url); it != inProgress.end()) { + inProgress.erase(it); + } +} + +std::map<std::string, std::shared_ptr<job::Stuff>> job::inProgress; +base_with_mutex job::inProgressLock; } diff --git a/src/job.h b/src/job.h index cb162a6..97446e2 100644 --- a/src/job.h +++ b/src/job.h @@ -16,6 +16,39 @@ class header; class job { +private: + // Lock controlling access to inProgress + static base_with_mutex inProgressLock; + + // The data that we store in inProgress + struct Stuff { + std::condition_variable cv; + tFileItemPtr otherThread = 0; + }; + + // Map from URL to Stuff for in progress jobs that are requesting this file. + // The entry is "owned" by the job that added it and it is deleted when the job completes. + static std::map<std::string, std::shared_ptr<Stuff>> inProgress; + + // Where all the real work is done. + struct Lockstuff { + const std::string url; + std::shared_ptr<Stuff> stuff; + bool owner = false; + Lockstuff(const std::string& url_); + void setReturnValue(tFileItemPtr tfip); + ~Lockstuff(); + + }; + + // Simple class which is destroyed when the job is destroyed. It deletes the entry from inProgress. + struct inProgressCleanup { + const std::string url; + inProgressCleanup(const std::string& url_) : url(url_) { } + ~inProgressCleanup(); + }; + + std::unique_ptr<inProgressCleanup> m_ipc; public: enum eJobResult : short