Hello,
The new option introduced by this patch limits Squid I/O rate to
smooth out OS disk commit activity and to avoid blocking Rock diskers
(or even other processes!) on I/O. It should be used when swap demand
exceeds disk performance limits but the underlying file system does not
slow down incoming I/Os, allowing the situation to get out of control.
Lab and limited deployment tests show that without this option, Squid
may write to the OS buffers much faster than the OS is able to write to
disk. This disbalance eventually (and quite suddenly!) blocks any
process trying to do I/O for a second or two, which results in very poor
overall performance. Tuning file system parameters may help, but
sometimes is not enough.
The new max-swap-rate option is meant to be used together with the
existing swap-timeout option. Otherwise, the problem simply shifts one
layer up, with unrestricted workers overflowing restricted Rock I/O
queues. The swap-timeout option allows a worker to skip Store operations
when there are "too many" I/O pending already.
The code seems to work well, but can be further optimized to minimize
read (hit) delays and to better enforce configured swap timeouts.
HTH,
Alex.
Added max-swap-rate=swaps/sec option to Rock cache_dir.
The option limits disk access to smooth out OS disk commit activity and to
avoid blocking Rock diskers (or even other processes!) on I/O. Should be used
when swap demand exceeds disk performance limits but the underlying file
system does not slow down incoming I/Os, allowing the situation to get out of
control.
TODO: Account for the I/O rate limit when estimating whether a future I/O
will complete in time (for swap-timeout).
TODO: Consider allowing the next swap-in (i.e., read) through regardless of
the limit because, unlike writes, reads do not usually accumulate unfinished
I/O requests in OS buffers and, hence, do not eventually require the OS to
block all I/O.
=== modified file 'src/DiskIO/DiskFile.h'
--- src/DiskIO/DiskFile.h 2011-09-14 00:12:35 +0000
+++ src/DiskIO/DiskFile.h 2011-10-03 20:28:12 +0000
@@ -35,44 +35,47 @@
#include "squid.h"
#include "RefCount.h"
class IORequestor;
class ReadRequest;
class WriteRequest;
class DiskFile : public RefCountable
{
public:
/// generally useful configuration options supported by some children
class Config
{
public:
- Config(): ioTimeout(0) {}
+ Config(): ioTimeout(0), ioRate(-1) {}
/// canRead/Write should return false if expected I/O delay exceeds it
time_msec_t ioTimeout; // not enforced if zero, which is the default
+
+ /// shape I/O request stream to approach that many per second
+ int ioRate; // not enforced if negative, which is the default
};
typedef RefCount<DiskFile> Pointer;
/// notes supported configuration options; kids must call this first
virtual void configure(const Config &cfg) {}
virtual void open(int flags, mode_t mode, RefCount<IORequestor> callback) = 0;
virtual void create(int flags, mode_t mode, RefCount<IORequestor> callback) = 0;
virtual void read(ReadRequest *) = 0;
virtual void write(WriteRequest *) = 0;
virtual void close() = 0;
virtual bool canRead() const = 0;
virtual bool canWrite() const {return true;}
/** During migration only */
virtual int getFD() const {return -1;}
virtual bool error() const = 0;
=== modified file 'src/DiskIO/IpcIo/IpcIoFile.cc'
--- src/DiskIO/IpcIo/IpcIoFile.cc 2011-09-16 04:36:49 +0000
+++ src/DiskIO/IpcIo/IpcIoFile.cc 2011-10-03 20:28:12 +0000
@@ -79,40 +79,43 @@
void
IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
{
ioRequestor = callback;
Must(diskId < 0); // we do not know our disker yet
if (!queue.get())
queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
if (IamDiskProcess()) {
error_ = !DiskerOpen(dbName, flags, mode);
if (error_)
return;
diskId = KidIdentifier;
const bool inserted =
IpcIoFiles.insert(std::make_pair(diskId, this)).second;
Must(inserted);
+ queue->localRateLimit() =
+ static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
+
Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
ann.strand.tag = dbName;
Ipc::TypedMsgHdr message;
ann.pack(message);
SendMessage(Ipc::coordinatorAddr, message);
ioRequestor->ioCompletedNotification();
return;
}
Ipc::StrandSearchRequest request;
request.requestorId = KidIdentifier;
request.tag = dbName;
Ipc::TypedMsgHdr msg;
request.pack(msg);
Ipc::SendMessage(Ipc::coordinatorAddr, msg);
WaitingForOpen.push_back(this);
@@ -632,74 +635,135 @@
fd_bytes(TheFile, wrote, FD_WRITE);
if (wrote >= 0) {
ipcIo.xerrno = 0;
const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0
debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
(len == ipcIo.len ? "all " : "just ") << wrote);
ipcIo.len = len;
} else {
ipcIo.xerrno = errno;
ipcIo.len = 0;
debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
ipcIo.xerrno);
}
Ipc::Mem::PutPage(ipcIo.page);
}
void
-IpcIoFile::DiskerHandleMoreRequests(void*)
+IpcIoFile::DiskerHandleMoreRequests(void *source)
{
- debugs(47, 7, HERE << "resuming handling requests");
+ debugs(47, 7, HERE << "resuming handling requests after " <<
+ static_cast<const char *>(source));
DiskerHandleMoreRequestsScheduled = false;
IpcIoFile::DiskerHandleRequests();
}
+bool
+IpcIoFile::WaitBeforePop()
+{
+ const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
+ const double maxRate = ioRate/1e3; // req/ms
+
+ // do we need to enforce configured I/O rate?
+ if (maxRate <= 0)
+ return false;
+
+ // is there an I/O request we could potentially delay?
+ if (!queue->popReady()) {
+ // unlike pop(), popReady() is not reliable and does not block reader
+ // so we must proceed with pop() even if it is likely to fail
+ return false;
+ }
+
+ static timeval LastIo = current_time;
+
+ const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
+ // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
+ const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
+
+ const double credit = ioDuration; // what the last I/O should have cost us
+ const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
+ LastIo = current_time;
+
+ Ipc::QueueReader::Balance &balance = queue->localBalance();
+ balance += static_cast<int64_t>(credit - debit);
+
+ debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
+
+ if (balance > maxImbalance) {
+ // if we accumulated too much time for future slow I/Os,
+ // then shed accumulated time to keep just half of the excess
+ const int64_t toSpend = balance - maxImbalance/2;
+
+ if (toSpend/1e3 > Timeout)
+ debugs(47, DBG_IMPORTANT, "WARNING: Rock disker delays I/O " <<
+ "requests for " << (toSpend/1e3) << " seconds to obey " <<
+ ioRate << "/sec rate limit");
+
+ debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
+ (1e3*maxRate) << "/sec rate");
+ eventAdd("IpcIoFile::DiskerHandleMoreRequests",
+ &IpcIoFile::DiskerHandleMoreRequests,
+ const_cast<char*>("rate limiting"),
+ toSpend/1e3, 0, false);
+ DiskerHandleMoreRequestsScheduled = true;
+ return true;
+ } else
+ if (balance < -maxImbalance) {
+ // do not owe "too much" to avoid "too large" bursts of I/O
+ balance = -maxImbalance;
+ }
+
+ return false;
+}
+
void
IpcIoFile::DiskerHandleRequests()
{
// Balance our desire to maximize the number of concurrent I/O requests
// (reordred by OS to minimize seek time) with a requirement to
// send 1st-I/O notification messages, process Coordinator events, etc.
const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
const timeval loopStart = current_time;
int popped = 0;
int workerId = 0;
IpcIoMsg ipcIo;
- while (queue->pop(workerId, ipcIo)) {
+ while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
++popped;
// at least one I/O per call is guaranteed if the queue is not empty
DiskerHandleRequest(workerId, ipcIo);
getCurrentTime();
const double elapsedMsec = tvSubMsec(loopStart, current_time);
if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
if (!DiskerHandleMoreRequestsScheduled) {
// the gap must be positive for select(2) to be given a chance
const double minBreakSecs = 0.001;
eventAdd("IpcIoFile::DiskerHandleMoreRequests",
&IpcIoFile::DiskerHandleMoreRequests,
- NULL, minBreakSecs, 0, false);
+ const_cast<char*>("long I/O loop"),
+ minBreakSecs, 0, false);
DiskerHandleMoreRequestsScheduled = true;
}
debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
break;
}
}
// TODO: consider using O_DIRECT with "elevator" optimization where we pop
// requests first, then reorder the popped requests to optimize seek time,
// then do I/O, then take a break, and come back for the next set of I/O
// requests.
}
/// called when disker receives an I/O request
void
IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
{
if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
debugs(0,0, HERE << "disker" << KidIdentifier <<
=== modified file 'src/DiskIO/IpcIo/IpcIoFile.h'
--- src/DiskIO/IpcIo/IpcIoFile.h 2011-09-10 01:25:27 +0000
+++ src/DiskIO/IpcIo/IpcIoFile.h 2011-10-03 20:28:12 +0000
@@ -85,40 +85,41 @@
bool canWait() const;
private:
void trackPendingRequest(IpcIoPendingRequest *const pending);
void push(IpcIoPendingRequest *const pending);
IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
static void Notify(const int peerId);
static void OpenTimeout(void *const param);
static void CheckTimeouts(void *const param);
void checkTimeouts();
void scheduleTimeoutCheck();
static void HandleResponses(const char *const when);
void handleResponse(IpcIoMsg &ipcIo);
static void DiskerHandleMoreRequests(void*);
static void DiskerHandleRequests();
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
+ static bool WaitBeforePop();
private:
const String dbName; ///< the name of the file we are managing
int diskId; ///< the process ID of the disker we talk to
RefCount<IORequestor> ioRequestor;
bool error_; ///< whether we have seen at least one I/O error (XXX)
unsigned int lastRequestId; ///< last requestId used
/// maps requestId to the handleResponse callback
typedef std::map<unsigned int, IpcIoPendingRequest*> RequestMap;
RequestMap requestMap1; ///< older (or newer) pending requests
RequestMap requestMap2; ///< newer (or older) pending requests
RequestMap *olderRequests; ///< older requests (map1 or map2)
RequestMap *newerRequests; ///< newer requests (map2 or map1)
bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call
static const double Timeout; ///< timeout value in seconds
=== modified file 'src/cf.data.pre'
--- src/cf.data.pre 2011-09-22 00:46:26 +0000
+++ src/cf.data.pre 2011-10-03 20:28:12 +0000
@@ -2744,56 +2744,68 @@
cache_dir diskd Directory-Name Mbytes L1 L2 [options] [Q1=n] [Q2=n]
see argument descriptions under ufs above
Q1 specifies the number of unacknowledged I/O requests when Squid
stops opening new files. If this many messages are in the queues,
Squid won't open new files. Default is 64
Q2 specifies the number of unacknowledged messages when Squid
starts blocking. If this many messages are in the queues,
Squid blocks until it receives some replies. Default is 72
When Q1 < Q2 (the default), the cache directory is optimized
for lower response time at the expense of a decrease in hit
ratio. If Q1 > Q2, the cache directory is optimized for
higher hit ratio at the expense of an increase in response
time.
The rock store type:
- cache_dir rock Directory-Name Mbytes <max-size=bytes>
+ cache_dir rock Directory-Name Mbytes <max-size=bytes> [options]
The Rock Store type is a database-style storage. All cached
entries are stored in a "database" file, using fixed-size slots,
one entry per slot. The database size is specified in MB. The
slot size is specified in bytes using the max-size option. See
below for more info on the max-size option.
swap-timeout=msec: Squid will not start writing a miss to or
reading a hit from disk if it estimates that the swap operation
will take more than the specified number of milliseconds. By
default and when set to zero, disables the disk I/O time limit
enforcement. Ignored when using blocking I/O module because
blocking synchronous I/O does not allow Squid to estimate the
expected swap wait time.
+ max-swap-rate=swaps/sec: Artificially limits disk access using
+ the specified I/O rate limit. Swap in and swap out requests that
+ would cause the average I/O rate to exceed the limit are
+ delayed. This is necessary on file systems that buffer "too
+ many" writes and then start blocking Squid and other processes
+ while committing those writes to disk. Usually used together
+ with swap-timeout to avoid excessive delays and queue overflows
+ when disk demand exceeds available disk "bandwidth". By default
+ and when set to zero, disables the disk I/O rate limit
+ enforcement. Currently supported by IpcIo module only.
+
+
The coss store type:
NP: COSS filesystem in Squid-3 has been deemed too unstable for
production use and has thus been removed from this release.
We hope that it can be made usable again soon.
block-size=n defines the "block size" for COSS cache_dir's.
Squid uses file numbers as block numbers. Since file numbers
are limited to 24 bits, the block size determines the maximum
size of the COSS partition. The default is 512 bytes, which
leads to a maximum cache_dir size of 512<<24, or 8 GB. Note
you should not change the coss block size after Squid
has written some objects to the cache_dir.
The coss file store has changed from 2.5. Now it uses a file
called 'stripe' in the directory names in the config - and
this will be created by squid -z.
Common options:
=== modified file 'src/fs/rock/RockSwapDir.cc'
--- src/fs/rock/RockSwapDir.cc 2011-09-24 00:13:48 +0000
+++ src/fs/rock/RockSwapDir.cc 2011-10-03 20:28:12 +0000
@@ -280,40 +280,41 @@
const int i = GetInteger();
if (i < 0)
fatal("negative Rock cache_dir size value");
const uint64_t new_max_size =
static_cast<uint64_t>(i) << 20; // MBytes to Bytes
if (!reconfiguring)
max_size = new_max_size;
else if (new_max_size != max_size) {
debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
"cannot be changed dynamically, value left unchanged (" <<
(max_size >> 20) << " MB)");
}
}
ConfigOption *
Rock::SwapDir::getOptionTree() const
{
ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
assert(vector);
vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
+ vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
return vector;
}
bool
Rock::SwapDir::allowOptionReconfigure(const char *const option) const
{
return strcmp(option, "max-size") != 0 &&
::SwapDir::allowOptionReconfigure(option);
}
/// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
bool
Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfiguring)
{
// TODO: ::SwapDir or, better, Config should provide time-parsing routines,
// including time unit handling. Same for size.
time_msec_t *storedTime;
if (strcmp(option, "swap-timeout") == 0)
storedTime = &fileConfig.ioTimeout;
@@ -332,40 +333,83 @@
const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
if (reconfiguring && *storedTime != newTime)
debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newTime);
*storedTime = newTime;
return true;
}
/// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
void
Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
{
if (fileConfig.ioTimeout)
storeAppendPrintf(e, " swap-timeout=%"PRId64,
static_cast<int64_t>(fileConfig.ioTimeout));
}
+/// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
+bool
+Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
+{
+ int *storedRate;
+ if (strcmp(option, "max-swap-rate") == 0)
+ storedRate = &fileConfig.ioRate;
+ else
+ return false;
+
+ if (!value)
+ self_destruct();
+
+ // TODO: handle time units and detect parsing errors better
+ const int64_t parsedValue = strtoll(value, NULL, 10);
+ if (parsedValue < 0) {
+ debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
+ self_destruct();
+ }
+
+ const int newRate = static_cast<int>(parsedValue);
+
+ if (newRate < 0) {
+ debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
+ self_destruct();
+ }
+
+ if (isaReconfig && *storedRate != newRate)
+ debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newRate);
+
+ *storedRate = newRate;
+
+ return true;
+}
+
+/// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
+void
+Rock::SwapDir::dumpRateOption(StoreEntry * e) const
+{
+ if (fileConfig.ioRate >= 0)
+ storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
+}
+
/// check the results of the configuration; only level-0 debugging works here
void
Rock::SwapDir::validateOptions()
{
if (max_objsize <= 0)
fatal("Rock store requires a positive max-size");
#if THIS_CODE_IS_FIXED_AND_MOVED
// XXX: should not use map as it is not yet created
// XXX: max_size is in Bytes now
// XXX: Use DBG_IMPORTANT (and DBG_CRITICAL if opt_parse_cfg_only?)
// TODO: Shrink max_size to avoid waste?
const int64_t mapRoundWasteMx = max_objsize*sizeof(long)*8;
const int64_t sizeRoundWasteMx = 1024; // max_size stored in KB
const int64_t roundingWasteMx = max(mapRoundWasteMx, sizeRoundWasteMx);
const int64_t totalWaste = maxSize() - diskOffsetLimit();
assert(diskOffsetLimit() <= maxSize());
// warn if maximum db size is not reachable due to sfileno limit
if (map->entryLimit() == entryLimitHigh() && totalWaste > roundingWasteMx) {
=== modified file 'src/fs/rock/RockSwapDir.h'
--- src/fs/rock/RockSwapDir.h 2011-09-22 21:42:46 +0000
+++ src/fs/rock/RockSwapDir.h 2011-10-03 20:28:12 +0000
@@ -50,40 +50,42 @@
virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
virtual void maintain();
virtual void diskFull();
virtual void reference(StoreEntry &e);
virtual bool dereference(StoreEntry &e);
virtual void unlink(StoreEntry &e);
virtual void statfs(StoreEntry &e) const;
/* IORequestor API */
virtual void ioCompletedNotification();
virtual void closeCompleted();
virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ::ReadRequest>);
virtual void writeCompleted(int errflag, size_t len, RefCount< ::WriteRequest>);
virtual void parse(int index, char *path);
void parseSize(const bool reconfiguring); ///< parses anonymous cache_dir size option
void validateOptions(); ///< warns of configuration problems; may quit
bool parseTimeOption(char const *option, const char *value, int reconfiguring);
void dumpTimeOption(StoreEntry * e) const;
+ bool parseRateOption(char const *option, const char *value, int reconfiguring);
+ void dumpRateOption(StoreEntry * e) const;
void rebuild(); ///< starts loading and validating stored entry metadata
///< used to add entries successfully loaded during rebuild
bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from);
bool full() const; ///< no more entries can be stored without purging
void trackReferences(StoreEntry &e); ///< add to replacement policy scope
void ignoreReferences(StoreEntry &e); ///< delete from repl policy scope
int64_t diskOffset(int filen) const;
int64_t diskOffsetLimit() const;
int entryLimit() const { return map->entryLimit(); }
friend class Rebuild;
const char *filePath; ///< location of cache storage file inside path/
private:
DiskIOStrategy *io;
RefCount<DiskFile> theFile; ///< cache storage for this cache_dir
DirMap *map;
=== modified file 'src/ipc/AtomicWord.h'
--- src/ipc/AtomicWord.h 2011-09-15 03:44:50 +0000
+++ src/ipc/AtomicWord.h 2011-10-03 20:28:12 +0000
@@ -1,35 +1,37 @@
/*
* $Id$
*
*/
#ifndef SQUID_IPC_ATOMIC_WORD_H
#define SQUID_IPC_ATOMIC_WORD_H
#if HAVE_ATOMIC_OPS
/// Supplies atomic operations for an integral Value in memory shared by kids.
/// Used to implement non-blocking shared locks, queues, tables, and pools.
-template <class Value>
+template <class ValueType>
class AtomicWordT
{
public:
+ typedef ValueType Value;
+
AtomicWordT() {} // leave value unchanged
AtomicWordT(Value aValue): value(aValue) {} // XXX: unsafe
Value operator +=(int delta) { return __sync_add_and_fetch(&value, delta); }
Value operator -=(int delta) { return __sync_sub_and_fetch(&value, delta); }
Value operator ++() { return *this += 1; }
Value operator --() { return *this -= 1; }
Value operator ++(int) { return __sync_fetch_and_add(&value, 1); }
Value operator --(int) { return __sync_fetch_and_sub(&value, 1); }
bool swap_if(const int comparand, const int replacement) { return __sync_bool_compare_and_swap(&value, comparand, replacement); }
/// v1 = value; value &= v2; return v1;
Value fetchAndAnd(const Value v2) { return __sync_fetch_and_and(&value, v2); }
// TODO: no need for __sync_bool_compare_and_swap here?
bool operator ==(int v2) { return __sync_bool_compare_and_swap(&value, v2, value); }
// TODO: no need for __sync_fetch_and_add here?
Value get() const { return __sync_fetch_and_add(const_cast<Value*>(&value), 0); }
=== modified file 'src/ipc/Queue.cc'
--- src/ipc/Queue.cc 2011-09-06 22:32:30 +0000
+++ src/ipc/Queue.cc 2011-10-03 20:28:12 +0000
@@ -23,41 +23,42 @@
static String
QueuesId(String id)
{
id.append("__queues");
return id;
}
/// constructs QueueReaders ID from parent queue ID
static String
ReadersId(String id)
{
id.append("__readers");
return id;
}
/* QueueReader */
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
-Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
+ rateLimit(0), balance(0)
{
debugs(54, 7, HERE << "constructed " << id);
}
/* QueueReaders */
Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
{
Must(theCapacity > 0);
new (theReaders) QueueReader[theCapacity];
}
size_t
Ipc::QueueReaders::sharedMemorySize() const
{
return SharedMemorySize(theCapacity);
}
size_t
Ipc::QueueReaders::SharedMemorySize(const int capacity)
@@ -179,65 +180,105 @@
index1 = toProcessId - metadata->theGroupAIdOffset;
index2 = fromProcessId - metadata->theGroupBIdOffset;
offset = metadata->theGroupASize * metadata->theGroupBSize;
}
const int index = offset + index1 * metadata->theGroupBSize + index2;
return index;
}
Ipc::OneToOneUniQueue &
Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
{
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
const Ipc::OneToOneUniQueue &
Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
+{
+ Must(validProcessId(group, processId));
+ return group == groupA ?
+ processId - metadata->theGroupAIdOffset :
+ metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
+}
+
Ipc::QueueReader &
Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
{
- Must(validProcessId(group, processId));
- const int index = group == groupA ?
- processId - metadata->theGroupAIdOffset :
- metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
- return readers->theReaders[index];
+ return readers->theReaders[readerIndex(group, processId)];
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+{
+ return readers->theReaders[readerIndex(group, processId)];
}
void
Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
{
QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
debugs(54, 7, HERE << "reader: " << localReader.id);
Must(validProcessId(remoteGroup(), remoteProcessId));
localReader.clearSignal();
// we got a hint; we could reposition iteration to try popping from the
// remoteProcessId queue first; but it does not seem to help much and might
// introduce some bias so we do not do that for now:
// theLastPopProcessId = remoteProcessId;
}
+bool
+Ipc::FewToFewBiQueue::popReady() const
+{
+ // mimic FewToFewBiQueue::pop() but quit just before popping
+ int popProcessId = theLastPopProcessId; // preserve for future pop()
+ for (int i = 0; i < remoteGroupSize(); ++i) {
+ if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
+ popProcessId = remoteGroupIdOffset();
+ const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId);
+ if (!queue.empty())
+ return true;
+ }
+ return false; // most likely, no process had anything to pop
+}
+
+Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::localBalance()
+{
+ QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+ return r.balance;
+}
+
+Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::localRateLimit()
+{
+ QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+ return r.rateLimit;
+}
+
Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
{
Must(theGroupASize > 0);
Must(theGroupBSize > 0);
}
Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
{
}
Ipc::FewToFewBiQueue::Owner::~Owner()
{
delete metadataOwner;
delete queuesOwner;
delete readersOwner;
=== modified file 'src/ipc/Queue.h'
--- src/ipc/Queue.h 2011-09-06 22:32:30 +0000
+++ src/ipc/Queue.h 2011-10-03 20:28:12 +0000
@@ -29,40 +29,49 @@
bool blocked() const { return popBlocked == 1; }
/// marks the reader as blocked, waiting for a notification signal
void block() { popBlocked.swap_if(0, 1); }
/// removes the block() effects
void unblock() { popBlocked.swap_if(1, 0); }
/// if reader is blocked and not notified, marks the notification signal
/// as sent and not received, returning true; otherwise, returns false
bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
/// marks sent reader notification as received (also removes pop blocking)
void clearSignal() { unblock(); popSignal.swap_if(1,0); }
private:
AtomicWord popBlocked; ///< whether the reader is blocked on pop()
AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
public:
+ typedef AtomicWord Rate; ///< pop()s per second
+ Rate rateLimit; ///< pop()s per second limit if positive
+
+ // we need a signed atomic type because balance may get negative
+ typedef AtomicWordT<int> AtomicSignedMsec;
+ typedef AtomicSignedMsec Balance;
+ /// how far ahead the reader is compared to a perfect read/sec event rate
+ Balance balance;
+
/// unique ID for debugging which reader is used (works across processes)
const InstanceId<QueueReader> id;
};
/// shared array of QueueReaders
class QueueReaders
{
public:
QueueReaders(const int aCapacity);
size_t sharedMemorySize() const;
static size_t SharedMemorySize(const int capacity);
const int theCapacity; /// number of readers
QueueReader theReaders[]; /// readers
};
/**
* Lockless fixed-capacity queue for a single writer and a single reader.
*
* If the queue is empty, the reader is considered "blocked" and needs
@@ -172,49 +181,61 @@
Mem::Owner<QueueReaders> *const readersOwner;
};
static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
enum Group { groupA = 0, groupB = 1 };
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
Group localGroup() const { return theLocalGroup; }
Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
/// clears the reader notification received by the local process from the remote process
void clearReaderSignal(const int remoteProcessId);
/// picks a process and calls OneToOneUniQueue::pop() using its queue
template <class Value> bool pop(int &remoteProcessId, Value &value);
/// calls OneToOneUniQueue::push() using the given process queue
template <class Value> bool push(const int remoteProcessId, const Value &value);
+ // TODO: rename to findOldest() or some such
/// calls OneToOneUniQueue::peek() using the given process queue
template<class Value> bool peek(const int remoteProcessId, Value &value) const;
+ /// returns true if pop() would have probably succeeded but does not pop()
+ bool popReady() const;
+
+ /// returns local reader's balance
+ QueueReader::Balance &localBalance();
+
+ /// returns local reader's rate limit
+ QueueReader::Rate &localRateLimit();
+
private:
bool validProcessId(const Group group, const int processId) const;
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
QueueReader &reader(const Group group, const int processId);
+ const QueueReader &reader(const Group group, const int processId) const;
+ int readerIndex(const Group group, const int processId) const;
int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
private:
const Mem::Pointer<Metadata> metadata; ///< shared metadata
const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
const Mem::Pointer<QueueReaders> readers; ///< readers array
const Group theLocalGroup; ///< group of this queue
const int theLocalProcessId; ///< process ID of this queue
int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
};
// OneToOneUniQueue
template <class Value>
bool
OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
{