diff --git src/DiskIO/IpcIo/IpcIoFile.cc src/DiskIO/IpcIo/IpcIoFile.cc index aec7b40..337fdb9 100644 --- src/DiskIO/IpcIo/IpcIoFile.cc +++ src/DiskIO/IpcIo/IpcIoFile.cc @@ -340,41 +340,41 @@ IpcIoFile::push(IpcIoPendingRequest *const pending) SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len // TODO: grow queue size pending->completeIo(NULL); delete pending; } catch (const TextException &e) { debugs(47, DBG_IMPORTANT, HERE << e.what()); pending->completeIo(NULL); delete pending; } } /// whether we think there is enough time to complete the I/O bool IpcIoFile::canWait() const { if (!config.ioTimeout) return true; // no timeout specified IpcIoMsg oldestIo; - if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0) + if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0) return true; // we cannot estimate expected wait time; assume it is OK const int expectedWait = tvSubMsec(oldestIo.start, current_time); if (expectedWait < 0 || static_cast(expectedWait) < config.ioTimeout) return true; // expected wait time is acceptible debugs(47,2, HERE << "cannot wait: " << expectedWait << " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId)); return false; // do not want to wait that long } /// called when coordinator responds to worker open request void IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) { debugs(47, 7, HERE << "coordinator response to open request"); for (IpcIoFileList::iterator i = WaitingForOpen.begin(); i != WaitingForOpen.end(); ++i) { if (response.strand.tag == (*i)->dbName) { diff --git src/ipc/Queue.h src/ipc/Queue.h index 2098610..72fe3e5 100644 --- src/ipc/Queue.h +++ src/ipc/Queue.h @@ -181,43 +181,43 @@ public: Mem::Owner *const readersOwner; }; static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); enum Group { groupA = 0, groupB = 1 }; FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); Group localGroup() const { return theLocalGroup; } Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } /// clears the reader notification received by the local process from the remote process void clearReaderSignal(const int remoteProcessId); /// picks a process and calls OneToOneUniQueue::pop() using its queue template bool pop(int &remoteProcessId, Value &value); /// calls OneToOneUniQueue::push() using the given process queue template bool push(const int remoteProcessId, const Value &value); - // TODO: rename to findOldest() or some such - /// calls OneToOneUniQueue::peek() using the given process queue - template bool peek(const int remoteProcessId, Value &value) const; + /// finds the oldest item in incoming and outgoing queues between + /// us and the given remote process + template bool findOldest(const int remoteProcessId, Value &value) const; /// returns true if pop() would have probably succeeded but does not pop() bool popReady() const; /// returns local reader's balance QueueReader::Balance &localBalance(); /// returns local reader's rate limit QueueReader::Rate &localRateLimit(); private: bool validProcessId(const Group group, const int processId) const; int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); QueueReader &reader(const Group group, const int processId); const QueueReader &reader(const Group group, const int processId) const; int readerIndex(const Group group, const int processId) const; int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; } @@ -333,41 +333,41 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value) remoteProcessId = theLastPopProcessId; debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); return true; } } return false; // no process had anything to pop } template bool FewToFewBiQueue::push(const int remoteProcessId, const Value &value) { OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId); debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); return remoteQueue.push(value, &remoteReader); } template bool -FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const +FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const { // we may be called before remote process configured its queue end if (!validProcessId(remoteGroup(), remoteProcessId)) return false; // we need the oldest value, so start with the incoming, them-to-us queue: const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId); debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size()); if (inQueue.peek(value)) return true; // if the incoming queue is empty, check the outgoing, us-to-them queue: const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size()); return outQueue.peek(value); } } // namespace Ipc #endif // SQUID_IPC_QUEUE_H