Current swap-timeout code does not know about max-swap-rate.  It
simply finds the longest-waiting I/O in disker queues (incoming and
outgoing) and then assumes that the new I/O will wait at least that
long.  The assumption is likely to be wrong when the queue contains
lots of freshly queued requests to disker: Those requests have not
waited long yet, but a max-swap-rate limit will slow them down
shortly.

The patch changes the swap-timeout code to account for max-swap-rate
when dealing with the workers-to-disker queue: If there are N requests
pending, the new one will wait at least N/max-swap-rate seconds.  Also
expected wait time is adjusted based on the queue "balance" member, in
case we have been borrowing time against future I/O already.
---
 src/DiskIO/IpcIo/IpcIoFile.cc |   16 +++++++++++++++-
 src/ipc/Queue.cc              |   30 ++++++++++++++++++++++++++++++
 src/ipc/Queue.h               |   28 ++++++++++++++++++++++------
 3 files changed, 67 insertions(+), 7 deletions(-)

diff --git src/DiskIO/IpcIo/IpcIoFile.cc src/DiskIO/IpcIo/IpcIoFile.cc
index aec7b40..f70af46 100644
--- src/DiskIO/IpcIo/IpcIoFile.cc
+++ src/DiskIO/IpcIo/IpcIoFile.cc
@@ -343,41 +343,55 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
         pending->completeIo(NULL);
         delete pending;
     } catch (const TextException &e) {
         debugs(47, DBG_IMPORTANT, HERE << e.what());
         pending->completeIo(NULL);
         delete pending;
     }
 }
 
 /// whether we think there is enough time to complete the I/O
 bool
 IpcIoFile::canWait() const
 {
     if (!config.ioTimeout)
         return true; // no timeout specified
 
     IpcIoMsg oldestIo;
     if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
         return true; // we cannot estimate expected wait time; assume it is OK
 
-    const int expectedWait = tvSubMsec(oldestIo.start, current_time);
+    const int oldestWait = tvSubMsec(oldestIo.start, current_time);
+
+    int rateWait = -1; // time in millisecons
+    const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
+    if (ioRate > 0) {
+        // if there are N requests pending, the new one will wait at
+        // least N/max-swap-rate seconds
+        rateWait = 1e3 * queue->outSize(diskId) / ioRate;
+        // adjust N/max-swap-rate value based on the queue "balance"
+        // member, in case we have been borrowing time against future
+        // I/O already
+        rateWait += queue->balance(diskId);
+    }
+
+    const int expectedWait = max(oldestWait, rateWait);
     if (expectedWait < 0 ||
             static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
         return true; // expected wait time is acceptible
 
     debugs(47,2, HERE << "cannot wait: " << expectedWait <<
            " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
     return false; // do not want to wait that long
 }
 
 /// called when coordinator responds to worker open request
 void
 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
 {
     debugs(47, 7, HERE << "coordinator response to open request");
     for (IpcIoFileList::iterator i = WaitingForOpen.begin();
             i != WaitingForOpen.end(); ++i) {
         if (response.strand.tag == (*i)->dbName) {
             (*i)->openCompleted(&response);
             WaitingForOpen.erase(i);
             return;
diff --git src/ipc/Queue.cc src/ipc/Queue.cc
index 24e6706..76e266b 100644
--- src/ipc/Queue.cc
+++ src/ipc/Queue.cc
@@ -180,40 +180,56 @@ Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromPr
         index1 = toProcessId - metadata->theGroupAIdOffset;
         index2 = fromProcessId - metadata->theGroupBIdOffset;
         offset = metadata->theGroupASize * metadata->theGroupBSize;
     }
     const int index = offset + index1 * metadata->theGroupBSize + index2;
     return index;
 }
 
 Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
 {
     return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
 }
 
 const Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
 {
     return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
 }
 
+/// incoming queue from a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(remoteGroup(), remoteProcessId,
+                         theLocalGroup, theLocalProcessId);
+}
+
+/// outgoing queue to a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(theLocalGroup, theLocalProcessId,
+                         remoteGroup(), remoteProcessId);
+}
+
 int
 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
 {
     Must(validProcessId(group, processId));
     return group == groupA ?
            processId - metadata->theGroupAIdOffset :
            metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
 }
 
 Ipc::QueueReader &
 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
 {
     return readers->theReaders[readerIndex(group, processId)];
 }
 
 const Ipc::QueueReader &
 Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
 {
     return readers->theReaders[readerIndex(group, processId)];
 }
@@ -238,47 +254,61 @@ Ipc::FewToFewBiQueue::popReady() const
 {
     // mimic FewToFewBiQueue::pop() but quit just before popping
     int popProcessId = theLastPopProcessId; // preserve for future pop()
     for (int i = 0; i < remoteGroupSize(); ++i) {
         if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
             popProcessId = remoteGroupIdOffset();
         const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId);
         if (!queue.empty())
             return true;
     }
     return false; // most likely, no process had anything to pop
 }
 
 Ipc::QueueReader::Balance &
 Ipc::FewToFewBiQueue::localBalance()
 {
     QueueReader &r = reader(theLocalGroup, theLocalProcessId);
     return r.balance;
 }
 
+const Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+{
+    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+    return r.balance;
+}
+
 Ipc::QueueReader::Rate &
 Ipc::FewToFewBiQueue::localRateLimit()
 {
     QueueReader &r = reader(theLocalGroup, theLocalProcessId);
     return r.rateLimit;
 }
 
+const Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+{
+    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+    return r.rateLimit;
+}
+
 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
         theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
         theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
 {
     Must(theGroupASize > 0);
     Must(theGroupBSize > 0);
 }
 
 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
         metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
         queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
         readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
 {
 }
 
 Ipc::FewToFewBiQueue::Owner::~Owner()
 {
     delete metadataOwner;
     delete queuesOwner;
     delete readersOwner;
diff --git src/ipc/Queue.h src/ipc/Queue.h
index 2098610..640342d 100644
--- src/ipc/Queue.h
+++ src/ipc/Queue.h
@@ -191,48 +191,62 @@ public:
 
     /// clears the reader notification received by the local process from the remote process
     void clearReaderSignal(const int remoteProcessId);
 
     /// picks a process and calls OneToOneUniQueue::pop() using its queue
     template <class Value> bool pop(int &remoteProcessId, Value &value);
 
     /// calls OneToOneUniQueue::push() using the given process queue
     template <class Value> bool push(const int remoteProcessId, const Value &value);
 
     // TODO: rename to findOldest() or some such
     /// calls OneToOneUniQueue::peek() using the given process queue
     template<class Value> bool peek(const int remoteProcessId, Value &value) const;
 
     /// returns true if pop() would have probably succeeded but does not pop()
     bool popReady() const;
 
     /// returns local reader's balance
     QueueReader::Balance &localBalance();
 
+    /// returns reader's balance for a given remote process
+    const QueueReader::Balance &balance(const int remoteProcessId) const;
+
     /// returns local reader's rate limit
     QueueReader::Rate &localRateLimit();
 
+    /// returns reader's rate limit for a given remote process
+    const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+
+    /// number of items in incoming queue from a given remote process
+    int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+
+    /// number of items in outgoing queue to a given remote process
+    int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
 private:
     bool validProcessId(const Group group, const int processId) const;
     int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
     const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
     OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
+    const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+    const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
     QueueReader &reader(const Group group, const int processId);
     const QueueReader &reader(const Group group, const int processId) const;
     int readerIndex(const Group group, const int processId) const;
     int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
     int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
 
 private:
     const Mem::Pointer<Metadata> metadata; ///< shared metadata
     const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
     const Mem::Pointer<QueueReaders> readers; ///< readers array
 
     const Group theLocalGroup; ///< group of this queue
     const int theLocalProcessId; ///< process ID of this queue
     int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
 };
 
 
 // OneToOneUniQueue
 
 template <class Value>
@@ -340,34 +354,36 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
 
 template <class Value>
 bool
 FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
 {
     OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
     QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
     debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
     return remoteQueue.push(value, &remoteReader);
 }
 
 template <class Value>
 bool
 FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const
 {
     // we may be called before remote process configured its queue end
     if (!validProcessId(remoteGroup(), remoteProcessId))
         return false;
 
     // we need the oldest value, so start with the incoming, them-to-us queue:
-    const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId);
-    debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size());
-    if (inQueue.peek(value))
+    const OneToOneUniQueue &in = inQueue(remoteProcessId);
+    debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
+           theLocalProcessId << " at " << in.size());
+    if (in.peek(value))
         return true;
 
     // if the incoming queue is empty, check the outgoing, us-to-them queue:
-    const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
-    debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size());
-    return outQueue.peek(value);
+    const OneToOneUniQueue &out = outQueue(remoteProcessId);
+    debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
+           remoteProcessId << " at " << out.size());
+    return out.peek(value);
 }
 
 } // namespace Ipc
 
 #endif // SQUID_IPC_QUEUE_H

Reply via email to