diff --git src/DiskIO/IpcIo/IpcIoFile.cc src/DiskIO/IpcIo/IpcIoFile.cc index aec7b40..f70af46 100644 --- src/DiskIO/IpcIo/IpcIoFile.cc +++ src/DiskIO/IpcIo/IpcIoFile.cc @@ -343,41 +343,55 @@ IpcIoFile::push(IpcIoPendingRequest *const pending) pending->completeIo(NULL); delete pending; } catch (const TextException &e) { debugs(47, DBG_IMPORTANT, HERE << e.what()); pending->completeIo(NULL); delete pending; } } /// whether we think there is enough time to complete the I/O bool IpcIoFile::canWait() const { if (!config.ioTimeout) return true; // no timeout specified IpcIoMsg oldestIo; if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0) return true; // we cannot estimate expected wait time; assume it is OK - const int expectedWait = tvSubMsec(oldestIo.start, current_time); + const int oldestWait = tvSubMsec(oldestIo.start, current_time); + + int rateWait = -1; // time in millisecons + const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId); + if (ioRate > 0) { + // if there are N requests pending, the new one will wait at + // least N/max-swap-rate seconds + rateWait = 1e3 * queue->outSize(diskId) / ioRate; + // adjust N/max-swap-rate value based on the queue "balance" + // member, in case we have been borrowing time against future + // I/O already + rateWait += queue->balance(diskId); + } + + const int expectedWait = max(oldestWait, rateWait); if (expectedWait < 0 || static_cast(expectedWait) < config.ioTimeout) return true; // expected wait time is acceptible debugs(47,2, HERE << "cannot wait: " << expectedWait << " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId)); return false; // do not want to wait that long } /// called when coordinator responds to worker open request void IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) { debugs(47, 7, HERE << "coordinator response to open request"); for (IpcIoFileList::iterator i = WaitingForOpen.begin(); i != WaitingForOpen.end(); ++i) { if (response.strand.tag == (*i)->dbName) { (*i)->openCompleted(&response); WaitingForOpen.erase(i); return; diff --git src/ipc/Queue.cc src/ipc/Queue.cc index 24e6706..76e266b 100644 --- src/ipc/Queue.cc +++ src/ipc/Queue.cc @@ -180,40 +180,56 @@ Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromPr index1 = toProcessId - metadata->theGroupAIdOffset; index2 = fromProcessId - metadata->theGroupBIdOffset; offset = metadata->theGroupASize * metadata->theGroupBSize; } const int index = offset + index1 * metadata->theGroupBSize + index2; return index; } Ipc::OneToOneUniQueue & Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) { return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; } const Ipc::OneToOneUniQueue & Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const { return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; } +/// incoming queue from a given remote process +const Ipc::OneToOneUniQueue & +Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const +{ + return oneToOneQueue(remoteGroup(), remoteProcessId, + theLocalGroup, theLocalProcessId); +} + +/// outgoing queue to a given remote process +const Ipc::OneToOneUniQueue & +Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const +{ + return oneToOneQueue(theLocalGroup, theLocalProcessId, + remoteGroup(), remoteProcessId); +} + int Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const { Must(validProcessId(group, processId)); return group == groupA ? processId - metadata->theGroupAIdOffset : metadata->theGroupASize + processId - metadata->theGroupBIdOffset; } Ipc::QueueReader & Ipc::FewToFewBiQueue::reader(const Group group, const int processId) { return readers->theReaders[readerIndex(group, processId)]; } const Ipc::QueueReader & Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const { return readers->theReaders[readerIndex(group, processId)]; } @@ -238,47 +254,61 @@ Ipc::FewToFewBiQueue::popReady() const { // mimic FewToFewBiQueue::pop() but quit just before popping int popProcessId = theLastPopProcessId; // preserve for future pop() for (int i = 0; i < remoteGroupSize(); ++i) { if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize()) popProcessId = remoteGroupIdOffset(); const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId); if (!queue.empty()) return true; } return false; // most likely, no process had anything to pop } Ipc::QueueReader::Balance & Ipc::FewToFewBiQueue::localBalance() { QueueReader &r = reader(theLocalGroup, theLocalProcessId); return r.balance; } +const Ipc::QueueReader::Balance & +Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const +{ + const QueueReader &r = reader(remoteGroup(), remoteProcessId); + return r.balance; +} + Ipc::QueueReader::Rate & Ipc::FewToFewBiQueue::localRateLimit() { QueueReader &r = reader(theLocalGroup, theLocalProcessId); return r.rateLimit; } +const Ipc::QueueReader::Rate & +Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const +{ + const QueueReader &r = reader(remoteGroup(), remoteProcessId); + return r.rateLimit; +} + Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) { Must(theGroupASize > 0); Must(theGroupBSize > 0); } Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) { } Ipc::FewToFewBiQueue::Owner::~Owner() { delete metadataOwner; delete queuesOwner; delete readersOwner; diff --git src/ipc/Queue.h src/ipc/Queue.h index 2098610..640342d 100644 --- src/ipc/Queue.h +++ src/ipc/Queue.h @@ -191,48 +191,62 @@ public: /// clears the reader notification received by the local process from the remote process void clearReaderSignal(const int remoteProcessId); /// picks a process and calls OneToOneUniQueue::pop() using its queue template bool pop(int &remoteProcessId, Value &value); /// calls OneToOneUniQueue::push() using the given process queue template bool push(const int remoteProcessId, const Value &value); // TODO: rename to findOldest() or some such /// calls OneToOneUniQueue::peek() using the given process queue template bool peek(const int remoteProcessId, Value &value) const; /// returns true if pop() would have probably succeeded but does not pop() bool popReady() const; /// returns local reader's balance QueueReader::Balance &localBalance(); + /// returns reader's balance for a given remote process + const QueueReader::Balance &balance(const int remoteProcessId) const; + /// returns local reader's rate limit QueueReader::Rate &localRateLimit(); + /// returns reader's rate limit for a given remote process + const QueueReader::Rate &rateLimit(const int remoteProcessId) const; + + /// number of items in incoming queue from a given remote process + int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } + + /// number of items in outgoing queue to a given remote process + int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } + private: bool validProcessId(const Group group, const int processId) const; int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); + const OneToOneUniQueue &inQueue(const int remoteProcessId) const; + const OneToOneUniQueue &outQueue(const int remoteProcessId) const; QueueReader &reader(const Group group, const int processId); const QueueReader &reader(const Group group, const int processId) const; int readerIndex(const Group group, const int processId) const; int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; } private: const Mem::Pointer metadata; ///< shared metadata const Mem::Pointer queues; ///< unidirection one-to-one queues const Mem::Pointer readers; ///< readers array const Group theLocalGroup; ///< group of this queue const int theLocalProcessId; ///< process ID of this queue int theLastPopProcessId; ///< the ID of the last process we tried to pop() from }; // OneToOneUniQueue template @@ -340,34 +354,36 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value) template bool FewToFewBiQueue::push(const int remoteProcessId, const Value &value) { OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId); debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); return remoteQueue.push(value, &remoteReader); } template bool FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const { // we may be called before remote process configured its queue end if (!validProcessId(remoteGroup(), remoteProcessId)) return false; // we need the oldest value, so start with the incoming, them-to-us queue: - const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId); - debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size()); - if (inQueue.peek(value)) + const OneToOneUniQueue &in = inQueue(remoteProcessId); + debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << + theLocalProcessId << " at " << in.size()); + if (in.peek(value)) return true; // if the incoming queue is empty, check the outgoing, us-to-them queue: - const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); - debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size()); - return outQueue.peek(value); + const OneToOneUniQueue &out = outQueue(remoteProcessId); + debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << + remoteProcessId << " at " << out.size()); + return out.peek(value); } } // namespace Ipc #endif // SQUID_IPC_QUEUE_H