Currently, shared memory pages are used for shared memory cache and
IPC I/O module. Before the change, the number of shared memory pages
needed for IPC I/O was calculated from the size of shared memory
cache. Moreover, shared memory cache was required for IPC I/O.
The patch makes the limit for shared I/O pages independent from the
shared memory cache size and presence. IPC I/O pages limit is
calculated from the number of workers and diskers, it does not depend
on cache_dir configuration. This may change in the future if we learn
how to compute it (e.g., by multiplying max-swap-rate and swap-timeout
if both are available).
---
src/DiskIO/IpcIo/IpcIoFile.cc | 32 ++++++++++++++++++++++++++++++--
src/MemStore.cc | 26 +++++++++++++++++++++++++-
src/base/RunnersRegistry.h | 5 +++++
src/ipc/Queue.cc | 6 ++++++
src/ipc/Queue.h | 3 +++
src/ipc/mem/Pages.cc | 38 +++++++++++++-------------------------
src/ipc/mem/Pages.h | 3 +++
src/main.cc | 3 +++
8 files changed, 88 insertions(+), 28 deletions(-)
diff --git src/DiskIO/IpcIo/IpcIoFile.cc src/DiskIO/IpcIo/IpcIoFile.cc
index aec7b40..08269c8 100644
--- src/DiskIO/IpcIo/IpcIoFile.cc
+++ src/DiskIO/IpcIo/IpcIoFile.cc
@@ -6,40 +6,45 @@
#include "config.h"
#include "base/RunnersRegistry.h"
#include "base/TextException.h"
#include "DiskIO/IORequestor.h"
#include "DiskIO/IpcIo/IpcIoFile.h"
#include "DiskIO/ReadRequest.h"
#include "DiskIO/WriteRequest.h"
#include "ipc/Messages.h"
#include "ipc/Port.h"
#include "ipc/Queue.h"
#include "ipc/StrandSearch.h"
#include "ipc/UdsOp.h"
#include "ipc/mem/Pages.h"
#include "SquidTime.h"
CBDATA_CLASS_INIT(IpcIoFile);
/// shared memory segment path to use for IpcIoFile maps
static const char *const ShmLabel = "io_file";
+/// a single worker-to-disker or disker-to-worker queue capacity; up
+/// to 2*QueueCapacity I/O requests queued between a single worker and
+/// a single disker
+// TODO: make configurable or compute from squid.conf settings if possible
+static const int QueueCapacity = 1024;
const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
static bool DiskerOpen(const String &path, int flags, mode_t mode);
static void DiskerClose(const String &path);
/// IpcIo wrapper for debugs() streams; XXX: find a better class name
struct SipcIo {
SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
worker(aWorker), msg(aMsg), disker(aDisker) {}
int worker;
const IpcIoMsg &msg;
int disker;
};
@@ -813,55 +818,78 @@ DiskerOpen(const String &path, int flags, mode_t mode)
return false;
}
store_open_disk_fd++;
debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
return true;
}
static void
DiskerClose(const String &path)
{
if (TheFile >= 0) {
file_close(TheFile);
debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
TheFile = -1;
store_open_disk_fd--;
}
}
+/// reports our needs for shared memory pages to Ipc::Mem::Pages
+class IpcIoClaimMemoryNeedsRr: public RegisteredRunner
+{
+public:
+ /* RegisteredRunner API */
+ virtual void run(const RunnerRegistry &r);
+};
+
+RunnerRegistrationEntry(rrClaimMemoryNeeds, IpcIoClaimMemoryNeedsRr);
+
+
+void
+IpcIoClaimMemoryNeedsRr::run(const RunnerRegistry &)
+{
+ const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
+ ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
+ // the maximum number of shared I/O pages is approximately the
+ // number of queue slots, we add a fudge factor to that to account
+ // for corner cases where I/O pages are created before queue
+ // limits are checked or destroyed long after the I/O is dequeued
+ Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage, itemsCount * 1.1);
+}
+
+
/// initializes shared memory segments used by IpcIoFile
class IpcIoRr: public Ipc::Mem::RegisteredRunner
{
public:
/* RegisteredRunner API */
IpcIoRr(): owner(NULL) {}
virtual ~IpcIoRr();
protected:
virtual void create(const RunnerRegistry &);
private:
Ipc::FewToFewBiQueue::Owner *owner;
};
RunnerRegistrationEntry(rrAfterConfig, IpcIoRr);
void IpcIoRr::create(const RunnerRegistry &)
{
if (!UsingSmp())
return;
Must(!owner);
- // XXX: make capacity configurable
owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
Config.cacheSwap.n_strands,
1 + Config.workers, sizeof(IpcIoMsg),
- 1024);
+ QueueCapacity);
}
IpcIoRr::~IpcIoRr()
{
delete owner;
}
diff --git src/MemStore.cc src/MemStore.cc
index aece423..8048b56 100644
--- src/MemStore.cc
+++ src/MemStore.cc
@@ -325,40 +325,58 @@ MemStore::copyToShm(StoreEntry &e, MemStoreMap::Extras &extras)
void
MemStore::cleanReadable(const sfileno fileno)
{
Ipc::Mem::PutPage(map->extras(fileno).page);
theCurrentSize -= Ipc::Mem::PageSize();
}
/// calculates maximum number of entries we need to store and map
int64_t
MemStore::EntryLimit()
{
if (!Config.memMaxSize)
return 0; // no memory cache configured
const int64_t entrySize = Ipc::Mem::PageSize(); // for now
const int64_t entryLimit = Config.memMaxSize / entrySize;
return entryLimit;
}
+/// reports our needs for shared memory pages to Ipc::Mem::Pages
+class MemStoreClaimMemoryNeedsRr: public RegisteredRunner
+{
+public:
+ /* RegisteredRunner API */
+ virtual void run(const RunnerRegistry &r);
+};
+
+RunnerRegistrationEntry(rrClaimMemoryNeeds, MemStoreClaimMemoryNeedsRr);
+
+
+void
+MemStoreClaimMemoryNeedsRr::run(const RunnerRegistry &)
+{
+ Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::cachePage, MemStore::EntryLimit());
+}
+
+
/// initializes shared memory segments used by MemStore
class MemStoreRr: public Ipc::Mem::RegisteredRunner
{
public:
/* RegisteredRunner API */
MemStoreRr(): owner(NULL) {}
virtual void run(const RunnerRegistry &);
virtual ~MemStoreRr();
protected:
virtual void create(const RunnerRegistry &);
private:
MemStoreMap::Owner *owner;
};
RunnerRegistrationEntry(rrAfterConfig, MemStoreRr);
void MemStoreRr::run(const RunnerRegistry &r)
@@ -371,29 +389,35 @@ void MemStoreRr::run(const RunnerRegistry &r)
} else if (Config.memShared && !AtomicOperationsSupported) {
// bail if the user wants shared memory cache but we cannot support it
fatal("memory_cache_shared is on, but no support for atomic operations detected");
} else if (Config.memShared && !Ipc::Mem::Segment::Enabled()) {
fatal("memory_cache_shared is on, but no support for shared memory detected");
} else if (Config.memShared && !UsingSmp()) {
debugs(20, DBG_IMPORTANT, "WARNING: memory_cache_shared is on, but only"
" a single worker is running");
}
Ipc::Mem::RegisteredRunner::run(r);
}
void MemStoreRr::create(const RunnerRegistry &)
{
if (!Config.memShared)
return;
Must(!owner);
const int64_t entryLimit = MemStore::EntryLimit();
- if (entryLimit <= 0)
+ if (entryLimit <= 0) {
+ if (Config.memMaxSize > 0) {
+ debugs(20, DBG_IMPORTANT, "WARNING: mem-cache size is too small ("
+ << (Config.memMaxSize / 1024.0) << " KB), should be >= " <<
+ (Ipc::Mem::PageSize() / 1024.0) << " KB");
+ }
return; // no memory cache configured or a misconfiguration
+ }
owner = MemStoreMap::Init(ShmLabel, entryLimit);
}
MemStoreRr::~MemStoreRr()
{
delete owner;
}
diff --git src/base/RunnersRegistry.h src/base/RunnersRegistry.h
index 06e68e6..2d5b840 100644
--- src/base/RunnersRegistry.h
+++ src/base/RunnersRegistry.h
@@ -4,40 +4,45 @@
/**
* This API allows virtually any module to register with a well-known registry,
* be activated by some central processor at some registry-specific time, and
* be deactiveated by some central processor at some registry-specific time.
*
* For example, main.cc may activate registered I/O modules after parsing
* squid.conf and deactivate them before exiting.
*
* A module in this context is code providing a functionality or service to the
* rest of Squid, such as src/DiskIO/Blocking, src/fs/ufs, or Cache Manager. A
* module must declare a RegisteredRunner child class to implement activation and
* deactivation logic using the run() method and destructor, respectively.
*
* This API allows the registry to determine the right [de]activation time for
* each group of similar modules, without knowing any module specifics.
*
*/
/// well-known registries
typedef enum {
+ /// managed by main.cc; activated after parsing squid.conf but
+ /// before rrAfterConfig, deactivated after rrAfterConfig but
+ /// before freeing configuration-related memory or exit()-ing
+ rrClaimMemoryNeeds,
+
/// managed by main.cc; activated after parsing squid.conf and
/// deactivated before freeing configuration-related memory or exit()-ing
rrAfterConfig,
rrEnd ///< not a real registry, just a label to mark the end of enum
} RunnerRegistry;
/// a runnable registrant API
class RegisteredRunner
{
public:
// called when this runner's registry is deactivated
virtual ~RegisteredRunner() {}
// called when this runner's registry is activated
virtual void run(const RunnerRegistry &r) = 0;
};
/// registers a given runner with the given registry and returns registry count
diff --git src/ipc/Queue.cc src/ipc/Queue.cc
index 24e6706..8afca32 100644
--- src/ipc/Queue.cc
+++ src/ipc/Queue.cc
@@ -132,40 +132,46 @@ Ipc::OneToOneUniQueues::operator [](const int index) const
Ipc::FewToFewBiQueue::Owner *
Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
{
return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
}
Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
theLastPopProcessId(readers->theCapacity)
{
Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
}
+int
+Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
+{
+ return capacity * groupASize * groupBSize * 2;
+}
+
bool
Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
{
switch (group) {
case groupA:
return metadata->theGroupAIdOffset <= processId &&
processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
case groupB:
return metadata->theGroupBIdOffset <= processId &&
processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
}
return false;
}
int
Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
Must(fromGroup != toGroup);
assert(validProcessId(fromGroup, fromProcessId));
assert(validProcessId(toGroup, toProcessId));
diff --git src/ipc/Queue.h src/ipc/Queue.h
index 2098610..a5f1618 100644
--- src/ipc/Queue.h
+++ src/ipc/Queue.h
@@ -169,40 +169,43 @@ private:
};
public:
class Owner
{
public:
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
~Owner();
private:
Mem::Owner<Metadata> *const metadataOwner;
Mem::Owner<OneToOneUniQueues> *const queuesOwner;
Mem::Owner<QueueReaders> *const readersOwner;
};
static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
enum Group { groupA = 0, groupB = 1 };
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
+ /// maximum number of items in the queue
+ static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
+
Group localGroup() const { return theLocalGroup; }
Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
/// clears the reader notification received by the local process from the remote process
void clearReaderSignal(const int remoteProcessId);
/// picks a process and calls OneToOneUniQueue::pop() using its queue
template <class Value> bool pop(int &remoteProcessId, Value &value);
/// calls OneToOneUniQueue::push() using the given process queue
template <class Value> bool push(const int remoteProcessId, const Value &value);
// TODO: rename to findOldest() or some such
/// calls OneToOneUniQueue::peek() using the given process queue
template<class Value> bool peek(const int remoteProcessId, Value &value) const;
/// returns true if pop() would have probably succeeded but does not pop()
bool popReady() const;
/// returns local reader's balance
diff --git src/ipc/mem/Pages.cc src/ipc/mem/Pages.cc
index 054938b..c97487d 100644
--- src/ipc/mem/Pages.cc
+++ src/ipc/mem/Pages.cc
@@ -2,40 +2,41 @@
* $Id$
*
* DEBUG: section 54 Interprocess Communication
*
*/
#include "config.h"
#include "base/TextException.h"
#include "base/RunnersRegistry.h"
#include "ipc/mem/PagePool.h"
#include "ipc/mem/Pages.h"
#include "structs.h"
#include "SwapDir.h"
// Uses a single PagePool instance, for now.
// Eventually, we may have pools dedicated to memory caching, disk I/O, etc.
// TODO: make pool id more unique so it does not conflict with other Squids?
static const char *PagePoolId = "squid-page-pool";
static Ipc::Mem::PagePool *ThePagePool = 0;
+static int TheLimits[Ipc::Mem::PageId::maxPurpose];
// TODO: make configurable to avoid waste when mem-cached objects are small/big
size_t
Ipc::Mem::PageSize()
{
return 32*1024;
}
bool
Ipc::Mem::GetPage(const PageId::Purpose purpose, PageId &page)
{
return ThePagePool && PagesAvailable(purpose) > 0 ?
ThePagePool->get(purpose, page) : false;
}
void
Ipc::Mem::PutPage(PageId &page)
{
Must(ThePagePool);
ThePagePool->put(page);
@@ -43,101 +44,88 @@ Ipc::Mem::PutPage(PageId &page)
char *
Ipc::Mem::PagePointer(const PageId &page)
{
Must(ThePagePool);
return ThePagePool->pagePointer(page);
}
size_t
Ipc::Mem::PageLimit()
{
size_t limit = 0;
for (int i = 0; i < PageId::maxPurpose; ++i)
limit += PageLimit(i);
return limit;
}
size_t
Ipc::Mem::PageLimit(const int purpose)
{
- switch (purpose) {
- case PageId::cachePage:
- return Config.memMaxSize > 0 ? Config.memMaxSize / PageSize() : 0;
- case PageId::ioPage:
- // XXX: this should be independent from memory cache pages
- return PageLimit(PageId::cachePage)/2;
- default:
- Must(false);
- }
- return 0;
+ Must(0 <= purpose && purpose <= PageId::maxPurpose);
+ return TheLimits[purpose];
+}
+
+// note: adjust this if we start recording needs during reconfigure
+void
+Ipc::Mem::NotePageNeed(const int purpose, const int count)
+{
+ Must(0 <= purpose && purpose <= PageId::maxPurpose);
+ Must(count >= 0);
+ TheLimits[purpose] += count;
}
size_t
Ipc::Mem::PageLevel()
{
return ThePagePool ? ThePagePool->level() : 0;
}
size_t
Ipc::Mem::PageLevel(const int purpose)
{
return ThePagePool ? ThePagePool->level(purpose) : 0;
}
/// initializes shared memory pages
class SharedMemPagesRr: public Ipc::Mem::RegisteredRunner
{
public:
/* RegisteredRunner API */
SharedMemPagesRr(): owner(NULL) {}
virtual void run(const RunnerRegistry &);
virtual void create(const RunnerRegistry &);
virtual void open(const RunnerRegistry &);
virtual ~SharedMemPagesRr();
private:
Ipc::Mem::PagePool::Owner *owner;
};
RunnerRegistrationEntry(rrAfterConfig, SharedMemPagesRr);
void
SharedMemPagesRr::run(const RunnerRegistry &r)
{
- if (!UsingSmp())
- return;
-
- // When cache_dirs start using shared memory pages, they would
- // need to communicate their needs to us somehow.
- if (Config.memMaxSize <= 0)
- return;
-
- if (Ipc::Mem::PageLimit() <= 0) {
- if (IamMasterProcess()) {
- debugs(54, DBG_IMPORTANT, "WARNING: mem-cache size is too small ("
- << (Config.memMaxSize / 1024.0) << " KB), should be >= " <<
- (Ipc::Mem::PageSize() / 1024.0) << " KB");
- }
+ if (Ipc::Mem::PageLimit() <= 0)
return;
- }
Ipc::Mem::RegisteredRunner::run(r);
}
void
SharedMemPagesRr::create(const RunnerRegistry &)
{
Must(!owner);
owner = Ipc::Mem::PagePool::Init(PagePoolId, Ipc::Mem::PageLimit(),
Ipc::Mem::PageSize());
}
void
SharedMemPagesRr::open(const RunnerRegistry &)
{
Must(!ThePagePool);
ThePagePool = new Ipc::Mem::PagePool(PagePoolId);
}
SharedMemPagesRr::~SharedMemPagesRr()
diff --git src/ipc/mem/Pages.h src/ipc/mem/Pages.h
index aeea189..bb626d1 100644
--- src/ipc/mem/Pages.h
+++ src/ipc/mem/Pages.h
@@ -34,25 +34,28 @@ size_t PageLimit();
/// the total number of shared memory pages that can be in use at any
/// time for given purpose
size_t PageLimit(const int purpose);
/// approximate total number of shared memory pages used now
size_t PageLevel();
/// approximate total number of shared memory pages used now for given purpose
size_t PageLevel(const int purpose);
/// approximate total number of shared memory pages we can allocate now
inline size_t PagesAvailable() { return PageLimit() - PageLevel(); }
/// approximate total number of shared memory pages we can allocate
/// now for given purpose
inline size_t PagesAvailable(const int purpose) { return PageLimit(purpose) - PageLevel(purpose); }
/// returns page size in bytes; all pages are assumed to be the same size
size_t PageSize();
+/// claim the need for a number of pages for a given purpose
+void NotePageNeed(const int purpose, const int count);
+
} // namespace Mem
} // namespace Ipc
#endif // SQUID_IPC_MEM_PAGES_H
diff --git src/main.cc src/main.cc
index 546eb92..21e45a2 100644
--- src/main.cc
+++ src/main.cc
@@ -1425,40 +1425,41 @@ SquidMain(int argc, char **argv)
/* send signal to running copy and exit */
if (opt_send_signal != -1) {
/* chroot if configured to run inside chroot */
if (Config.chroot_dir) {
if (chroot(Config.chroot_dir))
fatal("failed to chroot");
no_suid();
} else {
leave_suid();
}
sendSignal();
/* NOTREACHED */
}
debugs(1,2, HERE << "Doing post-config initialization\n");
leave_suid();
+ ActivateRegistered(rrClaimMemoryNeeds);
ActivateRegistered(rrAfterConfig);
enter_suid();
if (!opt_no_daemon && Config.workers > 0)
watch_child(argv);
if (opt_create_swap_dirs) {
/* chroot if configured to run inside chroot */
if (Config.chroot_dir && chroot(Config.chroot_dir)) {
fatal("failed to chroot");
}
setEffectiveUser();
debugs(0, 0, "Creating Swap Directories");
Store::Root().create();
return 0;
}
@@ -1794,40 +1795,41 @@ watch_child(char *argv[])
kid->name().termedBuf(), kid->getPid());
}
if (kid->hopeless()) {
syslog(LOG_NOTICE, "Squid Parent: %s process %d will not"
" be restarted due to repeated, frequent failures",
kid->name().termedBuf(), kid->getPid());
}
} else {
syslog(LOG_NOTICE, "Squid Parent: unknown child process %d exited", pid);
}
#if _SQUID_NEXT_
} while ((pid = wait3(&status, WNOHANG, NULL)) > 0);
#else
}
while ((pid = waitpid(-1, &status, WNOHANG)) > 0);
#endif
if (!TheKids.someRunning() && !TheKids.shouldRestartSome()) {
leave_suid();
DeactivateRegistered(rrAfterConfig);
+ DeactivateRegistered(rrClaimMemoryNeeds);
enter_suid();
if (TheKids.someSignaled(SIGINT) || TheKids.someSignaled(SIGTERM)) {
syslog(LOG_ALERT, "Exiting due to unexpected forced shutdown");
exit(1);
}
if (TheKids.allHopeless()) {
syslog(LOG_ALERT, "Exiting due to repeated, frequent failures");
exit(1);
}
exit(0);
}
squid_signal(SIGINT, SIG_DFL, SA_RESTART);
sleep(3);
}
/* NOTREACHED */
@@ -1896,40 +1898,41 @@ SquidShutdown()
WIN32_svcstatusupdate(SERVICE_STOP_PENDING, 10000);
#endif
Store::Root().sync(); /* Flush pending object writes/unlinks */
#if USE_UNLINKD
unlinkdClose(); /* after sync/flush */
#endif
storeDirWriteCleanLogs(0);
PrintRusage();
dumpMallocStats();
Store::Root().sync(); /* Flush log writes */
storeLogClose();
accessLogClose();
Store::Root().sync(); /* Flush log close */
StoreFileSystem::FreeAllFs();
DiskIOModule::FreeAllModules();
DeactivateRegistered(rrAfterConfig);
+ DeactivateRegistered(rrClaimMemoryNeeds);
#if LEAK_CHECK_MODE && 0 /* doesn't work at the moment */
configFreeMemory();
storeFreeMemory();
/*stmemFreeMemory(); */
netdbFreeMemory();
ipcacheFreeMemory();
fqdncacheFreeMemory();
asnFreeMemory();
clientdbFreeMemory();
httpHeaderCleanModule();
statFreeMemory();
eventFreeMemory();
mimeFreeMemory();
errorClean();
#endif
#if !XMALLOC_TRACE
if (opt_no_daemon) {
file_close(0);