I've managed to reproduce this issue when many hosts hit apt-cacher-ng
at the same time.

Attached a patch which fixes it for me - this is a quick and hacky
patch!

Sent debug logs of a run that reproduces this problem and a run with
this patch applied directly to the maintainer.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 024f6a0..011ab42 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,5 +1,7 @@
 cmake_minimum_required(VERSION 3.1)
 
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+
 # try to set the best C++ language level
 set(CMAKE_CXX_STANDARD 20)
 # let it take the lowest version, we need some precursor of C++14x
diff --git a/src/job.cc b/src/job.cc
index a2025cc..5c003a9 100644
--- a/src/job.cc
+++ b/src/job.cc
@@ -662,6 +662,18 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname
 		else
 			m_sFileLoc=theUrl.sHost+theUrl.sPath;
 
+		// Here we serialize multiple clients trying to download the
+		// same file. Only one thread at a time per URL is allowed to
+		// proceed further in this function.
+		Lockstuff g{h.getRequestUrl()};
+
+		// Check if another job is running. If so link to that.
+		if(g.stuff->otherThread) {
+			m_pItem = m_pParentCon.GetItemRegistry()->Create(m_sFileLoc, ESharingHow::ALWAYS_TRY_SHARING, fileitem::tSpecialPurposeAttr{});
+			USRDBG("Linked to other job");
+			return;
+		}
+
 		fileitem::tSpecialPurposeAttr attr {
 			! cfg::offlinemode && data_type == FILE_VOLATILE,
 					m_bIsHeadOnly,
@@ -697,8 +709,14 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname
 		if(cfg::trackfileuse && fistate >= fileitem::FIST_DLGOTHEAD && fistate < fileitem::FIST_DLERROR)
 			m_pItem.get()->UpdateHeadTimestamp();
 
-		if(fistate==fileitem::FIST_COMPLETE)
+		if(fistate==fileitem::FIST_COMPLETE) {
+			// Tell everybody waiting for this thread to complete
+			// where to get their m_pItem and register a cleanup
+			// when this job completes.
+			g.setReturnValue(m_pItem.get());
+			m_ipc = std::make_unique<inProgressCleanup>(h.getRequestUrl());
 			return; // perfect, done here
+		}
 
 		if(cfg::offlinemode) { // make sure there will be no problems later in SendData or prepare a user message
 			// error or needs download but freshness check was disabled, so it's really not complete.
@@ -759,6 +777,11 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname
 				USRERR("PANIC! Error creating download job for " << m_sFileLoc);
 				return report_overload(__LINE__);
 			}
+			// Tell everybody waiting for this thread to complete
+			// where to get their m_pItem and register a cleanup
+			// when this job completes.
+			g.setReturnValue(m_pItem.get());
+			m_ipc = std::make_unique<inProgressCleanup>(h.getRequestUrl());
 		}
 	}
 	catch (const std::bad_alloc&) // OOM, may this ever happen here?
@@ -1190,4 +1213,58 @@ void job::AppendMetaHeaders()
 			  << "\r\nServer: Debian Apt-Cacher NG/" ACVERSION "\r\n"
 	"\r\n";
 }
+
+job::Lockstuff::Lockstuff(const std::string& url_): url(url_) {
+	lockuniq g{inProgressLock};
+	LOGSTARTFUNC;
+	while(true) {
+		auto [it, ins] = inProgress.insert({url, nullptr});
+		if(!ins) {
+			stuff = it->second;
+			if (stuff->otherThread) {
+				break;
+			}
+			// Someone is already downloading this. Add ourselves to the waiters.
+			stuff->cv.wait(g._guard);
+		} else {
+			stuff = it->second = std::make_shared<Stuff>();
+			owner = true;
+			break;
+		}
+	}
+}
+
+void job::Lockstuff::setReturnValue(tFileItemPtr tfip) {
+	LOGSTARTFUNC;
+	if (const auto& it = inProgress.find(url); it != inProgress.end()) {
+		stuff->otherThread = tfip;
+	}
+}
+
+job::Lockstuff::~Lockstuff() {
+	lockuniq g{inProgressLock};
+	LOGSTARTFUNC;
+	if(owner) {
+		stuff->cv.notify_all();
+		// After notify_all, any waiting threads are guaranteed to
+		// be blocked on inProgressLock, not on the condition so
+		// it's safe to delete it. However we have to use shared
+		// pointers because we don't know how long it will take the
+		// waiters to read the tFileItemPtr;
+		if (!stuff->otherThread) {
+			inProgress.erase(url);
+		}
+	}
+}
+
+job::inProgressCleanup::~inProgressCleanup() {
+	lockuniq g{inProgressLock};
+	LOGSTARTFUNC;
+	if (const auto& it = inProgress.find(url); it != inProgress.end()) {
+		inProgress.erase(it);
+	}
+}
+
+std::map<std::string, std::shared_ptr<job::Stuff>> job::inProgress;
+base_with_mutex job::inProgressLock;
 }
diff --git a/src/job.h b/src/job.h
index cb162a6..97446e2 100644
--- a/src/job.h
+++ b/src/job.h
@@ -16,6 +16,39 @@ class header;
 
 class job
 {
+private:
+	// Lock controlling access to inProgress
+	static base_with_mutex inProgressLock;
+
+	// The data that we store in inProgress
+	struct Stuff {
+		std::condition_variable cv;
+		tFileItemPtr otherThread = 0;
+	};
+
+	// Map from URL to Stuff for in progress jobs that are requesting this file.
+	// The entry is "owned" by the job that added it and it is deleted when the job completes.
+	static std::map<std::string, std::shared_ptr<Stuff>> inProgress;
+
+	// Where all the real work is done.
+	struct Lockstuff {
+		const std::string url;
+		std::shared_ptr<Stuff> stuff;
+		bool owner = false;
+		Lockstuff(const std::string& url_);
+		void setReturnValue(tFileItemPtr tfip);
+		~Lockstuff();
+
+	};
+
+	// Simple class which is destroyed when the job is destroyed. It deletes the entry from inProgress.
+	struct inProgressCleanup {
+		const std::string url;
+		inProgressCleanup(const std::string& url_) : url(url_) { }
+		~inProgressCleanup();
+	};
+
+	std::unique_ptr<inProgressCleanup> m_ipc;
 public:
 
     enum eJobResult : short

Reply via email to