Current swap-timeout code does not know about max-swap-rate. It
simply finds the longest-waiting I/O in disker queues (incoming and
outgoing) and then assumes that the new I/O will wait at least that
long. The assumption is likely to be wrong when the queue contains
lots of freshly queued requests to disker: Those requests have not
waited long yet, but a max-swap-rate limit will slow them down
shortly.
The patch changes the swap-timeout code to account for max-swap-rate
when dealing with the workers-to-disker queue: If there are N requests
pending, the new one will wait at least N/max-swap-rate seconds. Also
expected wait time is adjusted based on the queue "balance" member, in
case we have been borrowing time against future I/O already.
---
src/DiskIO/IpcIo/IpcIoFile.cc | 16 +++++++++++++++-
src/ipc/Queue.cc | 30 ++++++++++++++++++++++++++++++
src/ipc/Queue.h | 28 ++++++++++++++++++++++------
3 files changed, 67 insertions(+), 7 deletions(-)
diff --git src/DiskIO/IpcIo/IpcIoFile.cc src/DiskIO/IpcIo/IpcIoFile.cc
index aec7b40..f70af46 100644
--- src/DiskIO/IpcIo/IpcIoFile.cc
+++ src/DiskIO/IpcIo/IpcIoFile.cc
@@ -343,41 +343,55 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
pending->completeIo(NULL);
delete pending;
} catch (const TextException &e) {
debugs(47, DBG_IMPORTANT, HERE << e.what());
pending->completeIo(NULL);
delete pending;
}
}
/// whether we think there is enough time to complete the I/O
bool
IpcIoFile::canWait() const
{
if (!config.ioTimeout)
return true; // no timeout specified
IpcIoMsg oldestIo;
if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
return true; // we cannot estimate expected wait time; assume it is OK
- const int expectedWait = tvSubMsec(oldestIo.start, current_time);
+ const int oldestWait = tvSubMsec(oldestIo.start, current_time);
+
+ int rateWait = -1; // time in millisecons
+ const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
+ if (ioRate > 0) {
+ // if there are N requests pending, the new one will wait at
+ // least N/max-swap-rate seconds
+ rateWait = 1e3 * queue->outSize(diskId) / ioRate;
+ // adjust N/max-swap-rate value based on the queue "balance"
+ // member, in case we have been borrowing time against future
+ // I/O already
+ rateWait += queue->balance(diskId);
+ }
+
+ const int expectedWait = max(oldestWait, rateWait);
if (expectedWait < 0 ||
static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
return true; // expected wait time is acceptible
debugs(47,2, HERE << "cannot wait: " << expectedWait <<
" oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
return false; // do not want to wait that long
}
/// called when coordinator responds to worker open request
void
IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
{
debugs(47, 7, HERE << "coordinator response to open request");
for (IpcIoFileList::iterator i = WaitingForOpen.begin();
i != WaitingForOpen.end(); ++i) {
if (response.strand.tag == (*i)->dbName) {
(*i)->openCompleted(&response);
WaitingForOpen.erase(i);
return;
diff --git src/ipc/Queue.cc src/ipc/Queue.cc
index 24e6706..76e266b 100644
--- src/ipc/Queue.cc
+++ src/ipc/Queue.cc
@@ -180,40 +180,56 @@ Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromPr
index1 = toProcessId - metadata->theGroupAIdOffset;
index2 = fromProcessId - metadata->theGroupBIdOffset;
offset = metadata->theGroupASize * metadata->theGroupBSize;
}
const int index = offset + index1 * metadata->theGroupBSize + index2;
return index;
}
Ipc::OneToOneUniQueue &
Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
{
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
const Ipc::OneToOneUniQueue &
Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
+/// incoming queue from a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(remoteGroup(), remoteProcessId,
+ theLocalGroup, theLocalProcessId);
+}
+
+/// outgoing queue to a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(theLocalGroup, theLocalProcessId,
+ remoteGroup(), remoteProcessId);
+}
+
int
Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
{
Must(validProcessId(group, processId));
return group == groupA ?
processId - metadata->theGroupAIdOffset :
metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
}
Ipc::QueueReader &
Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
{
return readers->theReaders[readerIndex(group, processId)];
}
const Ipc::QueueReader &
Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
{
return readers->theReaders[readerIndex(group, processId)];
}
@@ -238,47 +254,61 @@ Ipc::FewToFewBiQueue::popReady() const
{
// mimic FewToFewBiQueue::pop() but quit just before popping
int popProcessId = theLastPopProcessId; // preserve for future pop()
for (int i = 0; i < remoteGroupSize(); ++i) {
if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
popProcessId = remoteGroupIdOffset();
const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId);
if (!queue.empty())
return true;
}
return false; // most likely, no process had anything to pop
}
Ipc::QueueReader::Balance &
Ipc::FewToFewBiQueue::localBalance()
{
QueueReader &r = reader(theLocalGroup, theLocalProcessId);
return r.balance;
}
+const Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+{
+ const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+ return r.balance;
+}
+
Ipc::QueueReader::Rate &
Ipc::FewToFewBiQueue::localRateLimit()
{
QueueReader &r = reader(theLocalGroup, theLocalProcessId);
return r.rateLimit;
}
+const Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+{
+ const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+ return r.rateLimit;
+}
+
Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
{
Must(theGroupASize > 0);
Must(theGroupBSize > 0);
}
Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
{
}
Ipc::FewToFewBiQueue::Owner::~Owner()
{
delete metadataOwner;
delete queuesOwner;
delete readersOwner;
diff --git src/ipc/Queue.h src/ipc/Queue.h
index 2098610..640342d 100644
--- src/ipc/Queue.h
+++ src/ipc/Queue.h
@@ -191,48 +191,62 @@ public:
/// clears the reader notification received by the local process from the remote process
void clearReaderSignal(const int remoteProcessId);
/// picks a process and calls OneToOneUniQueue::pop() using its queue
template <class Value> bool pop(int &remoteProcessId, Value &value);
/// calls OneToOneUniQueue::push() using the given process queue
template <class Value> bool push(const int remoteProcessId, const Value &value);
// TODO: rename to findOldest() or some such
/// calls OneToOneUniQueue::peek() using the given process queue
template<class Value> bool peek(const int remoteProcessId, Value &value) const;
/// returns true if pop() would have probably succeeded but does not pop()
bool popReady() const;
/// returns local reader's balance
QueueReader::Balance &localBalance();
+ /// returns reader's balance for a given remote process
+ const QueueReader::Balance &balance(const int remoteProcessId) const;
+
/// returns local reader's rate limit
QueueReader::Rate &localRateLimit();
+ /// returns reader's rate limit for a given remote process
+ const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+
+ /// number of items in incoming queue from a given remote process
+ int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+
+ /// number of items in outgoing queue to a given remote process
+ int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
private:
bool validProcessId(const Group group, const int processId) const;
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
+ const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+ const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
QueueReader &reader(const Group group, const int processId);
const QueueReader &reader(const Group group, const int processId) const;
int readerIndex(const Group group, const int processId) const;
int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
private:
const Mem::Pointer<Metadata> metadata; ///< shared metadata
const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
const Mem::Pointer<QueueReaders> readers; ///< readers array
const Group theLocalGroup; ///< group of this queue
const int theLocalProcessId; ///< process ID of this queue
int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
};
// OneToOneUniQueue
template <class Value>
@@ -340,34 +354,36 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
template <class Value>
bool
FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
{
OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
return remoteQueue.push(value, &remoteReader);
}
template <class Value>
bool
FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const
{
// we may be called before remote process configured its queue end
if (!validProcessId(remoteGroup(), remoteProcessId))
return false;
// we need the oldest value, so start with the incoming, them-to-us queue:
- const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId);
- debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size());
- if (inQueue.peek(value))
+ const OneToOneUniQueue &in = inQueue(remoteProcessId);
+ debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
+ theLocalProcessId << " at " << in.size());
+ if (in.peek(value))
return true;
// if the incoming queue is empty, check the outgoing, us-to-them queue:
- const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
- debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size());
- return outQueue.peek(value);
+ const OneToOneUniQueue &out = outQueue(remoteProcessId);
+ debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
+ remoteProcessId << " at " << out.size());
+ return out.peek(value);
}
} // namespace Ipc
#endif // SQUID_IPC_QUEUE_H