Added max-swap-rate=swaps/sec option to Rock cache_dir. The option limits disk access to smooth out OS disk commit activity and to avoid blocking Rock diskers (or even other processes!) on I/O. Should be used when swap demand exceeds disk performance limits but the underlying file system does not slow down incoming I/Os, allowing the situation to get out of control. TODO: Account for the I/O rate limit when estimating whether a future I/O will complete in time (for swap-timeout). TODO: Consider allowing the next swap-in (i.e., read) through regardless of the limit because, unlike writes, reads do not usually accumulate unfinished I/O requests in OS buffers and, hence, do not eventually require the OS to block all I/O. === modified file 'src/DiskIO/DiskFile.h' --- src/DiskIO/DiskFile.h 2011-09-14 00:12:35 +0000 +++ src/DiskIO/DiskFile.h 2011-10-03 20:28:12 +0000 @@ -35,44 +35,47 @@ #include "squid.h" #include "RefCount.h" class IORequestor; class ReadRequest; class WriteRequest; class DiskFile : public RefCountable { public: /// generally useful configuration options supported by some children class Config { public: - Config(): ioTimeout(0) {} + Config(): ioTimeout(0), ioRate(-1) {} /// canRead/Write should return false if expected I/O delay exceeds it time_msec_t ioTimeout; // not enforced if zero, which is the default + + /// shape I/O request stream to approach that many per second + int ioRate; // not enforced if negative, which is the default }; typedef RefCount Pointer; /// notes supported configuration options; kids must call this first virtual void configure(const Config &cfg) {} virtual void open(int flags, mode_t mode, RefCount callback) = 0; virtual void create(int flags, mode_t mode, RefCount callback) = 0; virtual void read(ReadRequest *) = 0; virtual void write(WriteRequest *) = 0; virtual void close() = 0; virtual bool canRead() const = 0; virtual bool canWrite() const {return true;} /** During migration only */ virtual int getFD() const {return -1;} virtual bool error() const = 0; === modified file 'src/DiskIO/IpcIo/IpcIoFile.cc' --- src/DiskIO/IpcIo/IpcIoFile.cc 2011-09-16 04:36:49 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.cc 2011-10-03 20:28:12 +0000 @@ -79,40 +79,43 @@ void IpcIoFile::open(int flags, mode_t mode, RefCount callback) { ioRequestor = callback; Must(diskId < 0); // we do not know our disker yet if (!queue.get()) queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier)); if (IamDiskProcess()) { error_ = !DiskerOpen(dbName, flags, mode); if (error_) return; diskId = KidIdentifier; const bool inserted = IpcIoFiles.insert(std::make_pair(diskId, this)).second; Must(inserted); + queue->localRateLimit() = + static_cast(config.ioRate); + Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); ann.strand.tag = dbName; Ipc::TypedMsgHdr message; ann.pack(message); SendMessage(Ipc::coordinatorAddr, message); ioRequestor->ioCompletedNotification(); return; } Ipc::StrandSearchRequest request; request.requestorId = KidIdentifier; request.tag = dbName; Ipc::TypedMsgHdr msg; request.pack(msg); Ipc::SendMessage(Ipc::coordinatorAddr, msg); WaitingForOpen.push_back(this); @@ -632,74 +635,135 @@ fd_bytes(TheFile, wrote, FD_WRITE); if (wrote >= 0) { ipcIo.xerrno = 0; const size_t len = static_cast(wrote); // safe because wrote > 0 debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " << (len == ipcIo.len ? "all " : "just ") << wrote); ipcIo.len = len; } else { ipcIo.xerrno = errno; ipcIo.len = 0; debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << ipcIo.xerrno); } Ipc::Mem::PutPage(ipcIo.page); } void -IpcIoFile::DiskerHandleMoreRequests(void*) +IpcIoFile::DiskerHandleMoreRequests(void *source) { - debugs(47, 7, HERE << "resuming handling requests"); + debugs(47, 7, HERE << "resuming handling requests after " << + static_cast(source)); DiskerHandleMoreRequestsScheduled = false; IpcIoFile::DiskerHandleRequests(); } +bool +IpcIoFile::WaitBeforePop() +{ + const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit(); + const double maxRate = ioRate/1e3; // req/ms + + // do we need to enforce configured I/O rate? + if (maxRate <= 0) + return false; + + // is there an I/O request we could potentially delay? + if (!queue->popReady()) { + // unlike pop(), popReady() is not reliable and does not block reader + // so we must proceed with pop() even if it is likely to fail + return false; + } + + static timeval LastIo = current_time; + + const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os + // do not accumulate more than 100ms or 100 I/Os, whichever is smaller + const int64_t maxImbalance = min(static_cast(100), static_cast(100 * ioDuration)); + + const double credit = ioDuration; // what the last I/O should have cost us + const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O + LastIo = current_time; + + Ipc::QueueReader::Balance &balance = queue->localBalance(); + balance += static_cast(credit - debit); + + debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit); + + if (balance > maxImbalance) { + // if we accumulated too much time for future slow I/Os, + // then shed accumulated time to keep just half of the excess + const int64_t toSpend = balance - maxImbalance/2; + + if (toSpend/1e3 > Timeout) + debugs(47, DBG_IMPORTANT, "WARNING: Rock disker delays I/O " << + "requests for " << (toSpend/1e3) << " seconds to obey " << + ioRate << "/sec rate limit"); + + debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" << + (1e3*maxRate) << "/sec rate"); + eventAdd("IpcIoFile::DiskerHandleMoreRequests", + &IpcIoFile::DiskerHandleMoreRequests, + const_cast("rate limiting"), + toSpend/1e3, 0, false); + DiskerHandleMoreRequestsScheduled = true; + return true; + } else + if (balance < -maxImbalance) { + // do not owe "too much" to avoid "too large" bursts of I/O + balance = -maxImbalance; + } + + return false; +} + void IpcIoFile::DiskerHandleRequests() { // Balance our desire to maximize the number of concurrent I/O requests // (reordred by OS to minimize seek time) with a requirement to // send 1st-I/O notification messages, process Coordinator events, etc. const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms const timeval loopStart = current_time; int popped = 0; int workerId = 0; IpcIoMsg ipcIo; - while (queue->pop(workerId, ipcIo)) { + while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) { ++popped; // at least one I/O per call is guaranteed if the queue is not empty DiskerHandleRequest(workerId, ipcIo); getCurrentTime(); const double elapsedMsec = tvSubMsec(loopStart, current_time); if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { if (!DiskerHandleMoreRequestsScheduled) { // the gap must be positive for select(2) to be given a chance const double minBreakSecs = 0.001; eventAdd("IpcIoFile::DiskerHandleMoreRequests", &IpcIoFile::DiskerHandleMoreRequests, - NULL, minBreakSecs, 0, false); + const_cast("long I/O loop"), + minBreakSecs, 0, false); DiskerHandleMoreRequestsScheduled = true; } debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " << elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O"); break; } } // TODO: consider using O_DIRECT with "elevator" optimization where we pop // requests first, then reorder the popped requests to optimize seek time, // then do I/O, then take a break, and come back for the next set of I/O // requests. } /// called when disker receives an I/O request void IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) { if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { debugs(0,0, HERE << "disker" << KidIdentifier << === modified file 'src/DiskIO/IpcIo/IpcIoFile.h' --- src/DiskIO/IpcIo/IpcIoFile.h 2011-09-10 01:25:27 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.h 2011-10-03 20:28:12 +0000 @@ -85,40 +85,41 @@ bool canWait() const; private: void trackPendingRequest(IpcIoPendingRequest *const pending); void push(IpcIoPendingRequest *const pending); IpcIoPendingRequest *dequeueRequest(const unsigned int requestId); static void Notify(const int peerId); static void OpenTimeout(void *const param); static void CheckTimeouts(void *const param); void checkTimeouts(); void scheduleTimeoutCheck(); static void HandleResponses(const char *const when); void handleResponse(IpcIoMsg &ipcIo); static void DiskerHandleMoreRequests(void*); static void DiskerHandleRequests(); static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); + static bool WaitBeforePop(); private: const String dbName; ///< the name of the file we are managing int diskId; ///< the process ID of the disker we talk to RefCount ioRequestor; bool error_; ///< whether we have seen at least one I/O error (XXX) unsigned int lastRequestId; ///< last requestId used /// maps requestId to the handleResponse callback typedef std::map RequestMap; RequestMap requestMap1; ///< older (or newer) pending requests RequestMap requestMap2; ///< newer (or older) pending requests RequestMap *olderRequests; ///< older requests (map1 or map2) RequestMap *newerRequests; ///< newer requests (map2 or map1) bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call static const double Timeout; ///< timeout value in seconds === modified file 'src/cf.data.pre' --- src/cf.data.pre 2011-09-22 00:46:26 +0000 +++ src/cf.data.pre 2011-10-03 20:28:12 +0000 @@ -2744,56 +2744,68 @@ cache_dir diskd Directory-Name Mbytes L1 L2 [options] [Q1=n] [Q2=n] see argument descriptions under ufs above Q1 specifies the number of unacknowledged I/O requests when Squid stops opening new files. If this many messages are in the queues, Squid won't open new files. Default is 64 Q2 specifies the number of unacknowledged messages when Squid starts blocking. If this many messages are in the queues, Squid blocks until it receives some replies. Default is 72 When Q1 < Q2 (the default), the cache directory is optimized for lower response time at the expense of a decrease in hit ratio. If Q1 > Q2, the cache directory is optimized for higher hit ratio at the expense of an increase in response time. The rock store type: - cache_dir rock Directory-Name Mbytes + cache_dir rock Directory-Name Mbytes [options] The Rock Store type is a database-style storage. All cached entries are stored in a "database" file, using fixed-size slots, one entry per slot. The database size is specified in MB. The slot size is specified in bytes using the max-size option. See below for more info on the max-size option. swap-timeout=msec: Squid will not start writing a miss to or reading a hit from disk if it estimates that the swap operation will take more than the specified number of milliseconds. By default and when set to zero, disables the disk I/O time limit enforcement. Ignored when using blocking I/O module because blocking synchronous I/O does not allow Squid to estimate the expected swap wait time. + max-swap-rate=swaps/sec: Artificially limits disk access using + the specified I/O rate limit. Swap in and swap out requests that + would cause the average I/O rate to exceed the limit are + delayed. This is necessary on file systems that buffer "too + many" writes and then start blocking Squid and other processes + while committing those writes to disk. Usually used together + with swap-timeout to avoid excessive delays and queue overflows + when disk demand exceeds available disk "bandwidth". By default + and when set to zero, disables the disk I/O rate limit + enforcement. Currently supported by IpcIo module only. + + The coss store type: NP: COSS filesystem in Squid-3 has been deemed too unstable for production use and has thus been removed from this release. We hope that it can be made usable again soon. block-size=n defines the "block size" for COSS cache_dir's. Squid uses file numbers as block numbers. Since file numbers are limited to 24 bits, the block size determines the maximum size of the COSS partition. The default is 512 bytes, which leads to a maximum cache_dir size of 512<<24, or 8 GB. Note you should not change the coss block size after Squid has written some objects to the cache_dir. The coss file store has changed from 2.5. Now it uses a file called 'stripe' in the directory names in the config - and this will be created by squid -z. Common options: === modified file 'src/fs/rock/RockSwapDir.cc' --- src/fs/rock/RockSwapDir.cc 2011-09-24 00:13:48 +0000 +++ src/fs/rock/RockSwapDir.cc 2011-10-03 20:28:12 +0000 @@ -280,40 +280,41 @@ const int i = GetInteger(); if (i < 0) fatal("negative Rock cache_dir size value"); const uint64_t new_max_size = static_cast(i) << 20; // MBytes to Bytes if (!reconfiguring) max_size = new_max_size; else if (new_max_size != max_size) { debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size " "cannot be changed dynamically, value left unchanged (" << (max_size >> 20) << " MB)"); } } ConfigOption * Rock::SwapDir::getOptionTree() const { ConfigOptionVector *vector = dynamic_cast(::SwapDir::getOptionTree()); assert(vector); vector->options.push_back(new ConfigOptionAdapter(*const_cast(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption)); + vector->options.push_back(new ConfigOptionAdapter(*const_cast(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption)); return vector; } bool Rock::SwapDir::allowOptionReconfigure(const char *const option) const { return strcmp(option, "max-size") != 0 && ::SwapDir::allowOptionReconfigure(option); } /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse() bool Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfiguring) { // TODO: ::SwapDir or, better, Config should provide time-parsing routines, // including time unit handling. Same for size. time_msec_t *storedTime; if (strcmp(option, "swap-timeout") == 0) storedTime = &fileConfig.ioTimeout; @@ -332,40 +333,83 @@ const time_msec_t newTime = static_cast(parsedValue); if (reconfiguring && *storedTime != newTime) debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newTime); *storedTime = newTime; return true; } /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump() void Rock::SwapDir::dumpTimeOption(StoreEntry * e) const { if (fileConfig.ioTimeout) storeAppendPrintf(e, " swap-timeout=%"PRId64, static_cast(fileConfig.ioTimeout)); } +/// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse() +bool +Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig) +{ + int *storedRate; + if (strcmp(option, "max-swap-rate") == 0) + storedRate = &fileConfig.ioRate; + else + return false; + + if (!value) + self_destruct(); + + // TODO: handle time units and detect parsing errors better + const int64_t parsedValue = strtoll(value, NULL, 10); + if (parsedValue < 0) { + debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue); + self_destruct(); + } + + const int newRate = static_cast(parsedValue); + + if (newRate < 0) { + debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate); + self_destruct(); + } + + if (isaReconfig && *storedRate != newRate) + debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newRate); + + *storedRate = newRate; + + return true; +} + +/// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump() +void +Rock::SwapDir::dumpRateOption(StoreEntry * e) const +{ + if (fileConfig.ioRate >= 0) + storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate); +} + /// check the results of the configuration; only level-0 debugging works here void Rock::SwapDir::validateOptions() { if (max_objsize <= 0) fatal("Rock store requires a positive max-size"); #if THIS_CODE_IS_FIXED_AND_MOVED // XXX: should not use map as it is not yet created // XXX: max_size is in Bytes now // XXX: Use DBG_IMPORTANT (and DBG_CRITICAL if opt_parse_cfg_only?) // TODO: Shrink max_size to avoid waste? const int64_t mapRoundWasteMx = max_objsize*sizeof(long)*8; const int64_t sizeRoundWasteMx = 1024; // max_size stored in KB const int64_t roundingWasteMx = max(mapRoundWasteMx, sizeRoundWasteMx); const int64_t totalWaste = maxSize() - diskOffsetLimit(); assert(diskOffsetLimit() <= maxSize()); // warn if maximum db size is not reachable due to sfileno limit if (map->entryLimit() == entryLimitHigh() && totalWaste > roundingWasteMx) { === modified file 'src/fs/rock/RockSwapDir.h' --- src/fs/rock/RockSwapDir.h 2011-09-22 21:42:46 +0000 +++ src/fs/rock/RockSwapDir.h 2011-10-03 20:28:12 +0000 @@ -50,40 +50,42 @@ virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual void maintain(); virtual void diskFull(); virtual void reference(StoreEntry &e); virtual bool dereference(StoreEntry &e); virtual void unlink(StoreEntry &e); virtual void statfs(StoreEntry &e) const; /* IORequestor API */ virtual void ioCompletedNotification(); virtual void closeCompleted(); virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ::ReadRequest>); virtual void writeCompleted(int errflag, size_t len, RefCount< ::WriteRequest>); virtual void parse(int index, char *path); void parseSize(const bool reconfiguring); ///< parses anonymous cache_dir size option void validateOptions(); ///< warns of configuration problems; may quit bool parseTimeOption(char const *option, const char *value, int reconfiguring); void dumpTimeOption(StoreEntry * e) const; + bool parseRateOption(char const *option, const char *value, int reconfiguring); + void dumpRateOption(StoreEntry * e) const; void rebuild(); ///< starts loading and validating stored entry metadata ///< used to add entries successfully loaded during rebuild bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from); bool full() const; ///< no more entries can be stored without purging void trackReferences(StoreEntry &e); ///< add to replacement policy scope void ignoreReferences(StoreEntry &e); ///< delete from repl policy scope int64_t diskOffset(int filen) const; int64_t diskOffsetLimit() const; int entryLimit() const { return map->entryLimit(); } friend class Rebuild; const char *filePath; ///< location of cache storage file inside path/ private: DiskIOStrategy *io; RefCount theFile; ///< cache storage for this cache_dir DirMap *map; === modified file 'src/ipc/AtomicWord.h' --- src/ipc/AtomicWord.h 2011-09-15 03:44:50 +0000 +++ src/ipc/AtomicWord.h 2011-10-03 20:28:12 +0000 @@ -1,35 +1,37 @@ /* * $Id$ * */ #ifndef SQUID_IPC_ATOMIC_WORD_H #define SQUID_IPC_ATOMIC_WORD_H #if HAVE_ATOMIC_OPS /// Supplies atomic operations for an integral Value in memory shared by kids. /// Used to implement non-blocking shared locks, queues, tables, and pools. -template +template class AtomicWordT { public: + typedef ValueType Value; + AtomicWordT() {} // leave value unchanged AtomicWordT(Value aValue): value(aValue) {} // XXX: unsafe Value operator +=(int delta) { return __sync_add_and_fetch(&value, delta); } Value operator -=(int delta) { return __sync_sub_and_fetch(&value, delta); } Value operator ++() { return *this += 1; } Value operator --() { return *this -= 1; } Value operator ++(int) { return __sync_fetch_and_add(&value, 1); } Value operator --(int) { return __sync_fetch_and_sub(&value, 1); } bool swap_if(const int comparand, const int replacement) { return __sync_bool_compare_and_swap(&value, comparand, replacement); } /// v1 = value; value &= v2; return v1; Value fetchAndAnd(const Value v2) { return __sync_fetch_and_and(&value, v2); } // TODO: no need for __sync_bool_compare_and_swap here? bool operator ==(int v2) { return __sync_bool_compare_and_swap(&value, v2, value); } // TODO: no need for __sync_fetch_and_add here? Value get() const { return __sync_fetch_and_add(const_cast(&value), 0); } === modified file 'src/ipc/Queue.cc' --- src/ipc/Queue.cc 2011-09-06 22:32:30 +0000 +++ src/ipc/Queue.cc 2011-10-03 20:28:12 +0000 @@ -23,41 +23,42 @@ static String QueuesId(String id) { id.append("__queues"); return id; } /// constructs QueueReaders ID from parent queue ID static String ReadersId(String id) { id.append("__readers"); return id; } /* QueueReader */ InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); -Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0) +Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0), + rateLimit(0), balance(0) { debugs(54, 7, HERE << "constructed " << id); } /* QueueReaders */ Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) { Must(theCapacity > 0); new (theReaders) QueueReader[theCapacity]; } size_t Ipc::QueueReaders::sharedMemorySize() const { return SharedMemorySize(theCapacity); } size_t Ipc::QueueReaders::SharedMemorySize(const int capacity) @@ -179,65 +180,105 @@ index1 = toProcessId - metadata->theGroupAIdOffset; index2 = fromProcessId - metadata->theGroupBIdOffset; offset = metadata->theGroupASize * metadata->theGroupBSize; } const int index = offset + index1 * metadata->theGroupBSize + index2; return index; } Ipc::OneToOneUniQueue & Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) { return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; } const Ipc::OneToOneUniQueue & Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const { return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; } +int +Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const +{ + Must(validProcessId(group, processId)); + return group == groupA ? + processId - metadata->theGroupAIdOffset : + metadata->theGroupASize + processId - metadata->theGroupBIdOffset; +} + Ipc::QueueReader & Ipc::FewToFewBiQueue::reader(const Group group, const int processId) { - Must(validProcessId(group, processId)); - const int index = group == groupA ? - processId - metadata->theGroupAIdOffset : - metadata->theGroupASize + processId - metadata->theGroupBIdOffset; - return readers->theReaders[index]; + return readers->theReaders[readerIndex(group, processId)]; +} + +const Ipc::QueueReader & +Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const +{ + return readers->theReaders[readerIndex(group, processId)]; } void Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) { QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); debugs(54, 7, HERE << "reader: " << localReader.id); Must(validProcessId(remoteGroup(), remoteProcessId)); localReader.clearSignal(); // we got a hint; we could reposition iteration to try popping from the // remoteProcessId queue first; but it does not seem to help much and might // introduce some bias so we do not do that for now: // theLastPopProcessId = remoteProcessId; } +bool +Ipc::FewToFewBiQueue::popReady() const +{ + // mimic FewToFewBiQueue::pop() but quit just before popping + int popProcessId = theLastPopProcessId; // preserve for future pop() + for (int i = 0; i < remoteGroupSize(); ++i) { + if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize()) + popProcessId = remoteGroupIdOffset(); + const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId); + if (!queue.empty()) + return true; + } + return false; // most likely, no process had anything to pop +} + +Ipc::QueueReader::Balance & +Ipc::FewToFewBiQueue::localBalance() +{ + QueueReader &r = reader(theLocalGroup, theLocalProcessId); + return r.balance; +} + +Ipc::QueueReader::Rate & +Ipc::FewToFewBiQueue::localRateLimit() +{ + QueueReader &r = reader(theLocalGroup, theLocalProcessId); + return r.rateLimit; +} + Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) { Must(theGroupASize > 0); Must(theGroupBSize > 0); } Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) { } Ipc::FewToFewBiQueue::Owner::~Owner() { delete metadataOwner; delete queuesOwner; delete readersOwner; === modified file 'src/ipc/Queue.h' --- src/ipc/Queue.h 2011-09-06 22:32:30 +0000 +++ src/ipc/Queue.h 2011-10-03 20:28:12 +0000 @@ -29,40 +29,49 @@ bool blocked() const { return popBlocked == 1; } /// marks the reader as blocked, waiting for a notification signal void block() { popBlocked.swap_if(0, 1); } /// removes the block() effects void unblock() { popBlocked.swap_if(1, 0); } /// if reader is blocked and not notified, marks the notification signal /// as sent and not received, returning true; otherwise, returns false bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); } /// marks sent reader notification as received (also removes pop blocking) void clearSignal() { unblock(); popSignal.swap_if(1,0); } private: AtomicWord popBlocked; ///< whether the reader is blocked on pop() AtomicWord popSignal; ///< whether writer has sent and reader has not received notification public: + typedef AtomicWord Rate; ///< pop()s per second + Rate rateLimit; ///< pop()s per second limit if positive + + // we need a signed atomic type because balance may get negative + typedef AtomicWordT AtomicSignedMsec; + typedef AtomicSignedMsec Balance; + /// how far ahead the reader is compared to a perfect read/sec event rate + Balance balance; + /// unique ID for debugging which reader is used (works across processes) const InstanceId id; }; /// shared array of QueueReaders class QueueReaders { public: QueueReaders(const int aCapacity); size_t sharedMemorySize() const; static size_t SharedMemorySize(const int capacity); const int theCapacity; /// number of readers QueueReader theReaders[]; /// readers }; /** * Lockless fixed-capacity queue for a single writer and a single reader. * * If the queue is empty, the reader is considered "blocked" and needs @@ -172,49 +181,61 @@ Mem::Owner *const readersOwner; }; static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); enum Group { groupA = 0, groupB = 1 }; FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); Group localGroup() const { return theLocalGroup; } Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } /// clears the reader notification received by the local process from the remote process void clearReaderSignal(const int remoteProcessId); /// picks a process and calls OneToOneUniQueue::pop() using its queue template bool pop(int &remoteProcessId, Value &value); /// calls OneToOneUniQueue::push() using the given process queue template bool push(const int remoteProcessId, const Value &value); + // TODO: rename to findOldest() or some such /// calls OneToOneUniQueue::peek() using the given process queue template bool peek(const int remoteProcessId, Value &value) const; + /// returns true if pop() would have probably succeeded but does not pop() + bool popReady() const; + + /// returns local reader's balance + QueueReader::Balance &localBalance(); + + /// returns local reader's rate limit + QueueReader::Rate &localRateLimit(); + private: bool validProcessId(const Group group, const int processId) const; int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); QueueReader &reader(const Group group, const int processId); + const QueueReader &reader(const Group group, const int processId) const; + int readerIndex(const Group group, const int processId) const; int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; } private: const Mem::Pointer metadata; ///< shared metadata const Mem::Pointer queues; ///< unidirection one-to-one queues const Mem::Pointer readers; ///< readers array const Group theLocalGroup; ///< group of this queue const int theLocalProcessId; ///< process ID of this queue int theLastPopProcessId; ///< the ID of the last process we tried to pop() from }; // OneToOneUniQueue template bool OneToOneUniQueue::pop(Value &value, QueueReader *const reader) {