SMP Caching, part 3: IPC and shared memory primitives. To make SMP disk and memory caching non-blocking and correct, worker and disker processes must asynchronously communicate with each other. We are adding a collection of classes that support such communication. At the base of the collection is the AtomicWordT template that uses GCC atomic primitives such as __sync_add_and_fetch() to perform atomic operations on integral values in memory shared by multiple Squid kids. AtomicWordT is used to implement non-blocking shared locks, queues, store tables, and page pools. To avoid blocking or very long searches, many operations are "optimistic" in nature. For example, it is possible that an atomic store map will refuse to allocate an entry for two processes even though a blocking implementation would have allowed one of the processes to get the map slot. We speculate that such conflict resolution is better than blocking locks when it comes to caching, especially if the conflicts are rare due to large number of cache entries, fast operations, and relatively small number of kids. TODO: Add AtomicWordT description (see above), portability. Eventually, consider breaking locks left by dead kids. === added file 'src/ipc/AtomicWord.h' --- src/ipc/AtomicWord.h 1970-01-01 00:00:00 +0000 +++ src/ipc/AtomicWord.h 2011-04-18 10:37:52 +0000 @@ -0,0 +1,39 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_ATOMIC_WORD_H +#define SQUID_IPC_ATOMIC_WORD_H + +template +class AtomicWordT { +public: + AtomicWordT() {} // leave value unchanged + AtomicWordT(Value aValue): value(aValue) {} // XXX: unsafe + + Value operator +=(int delta) { return __sync_add_and_fetch(&value, delta); } + Value operator ++() { return *this += 1; } + Value operator --() { return *this += -1; } + Value operator ++(int) { return __sync_fetch_and_add(&value, 1); } + Value operator --(int) { return __sync_fetch_and_add(&value, -1); } + + bool swap_if(const int comparand, const int replacement) { return __sync_bool_compare_and_swap(&value, comparand, replacement); } + + /// v1 = value; value &= v2; return v1; + Value fetchAndAnd(const Value v2) { return __sync_fetch_and_and(&value, v2); } + + // TODO: no need for __sync_bool_compare_and_swap here? + bool operator ==(int v2) { return __sync_bool_compare_and_swap(&value, v2, value); } + + // TODO: no need for __sync_fetch_and_add here? + Value get() const { return __sync_fetch_and_add(const_cast(&value), 0); } + operator Value () const { return get(); } + +private: + Value value; +}; + +typedef AtomicWordT AtomicWord; + +#endif // SQUID_IPC_ATOMIC_WORD_H === added file 'src/ipc/Queue.cc' --- src/ipc/Queue.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/Queue.cc 2011-04-26 20:39:59 +0000 @@ -0,0 +1,232 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "Debug.h" +#include "globals.h" +#include "ipc/Queue.h" + +/// constructs Metadata ID from parent queue ID +static String +MetadataId(String id) +{ + id.append("__metadata"); + return id; +} + +/// constructs one-to-one queues ID from parent queue ID +static String +QueuesId(String id) +{ + id.append("__queues"); + return id; +} + +/// constructs QueueReaders ID from parent queue ID +static String +ReadersId(String id) +{ + id.append("__readers"); + return id; +} + + +/* QueueReader */ + +InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); + +Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0) +{ + debugs(54, 7, HERE << "constructed " << id); +} + +/* QueueReaders */ + +Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) +{ + Must(theCapacity > 0); + new (theReaders) QueueReader[theCapacity]; +} + +size_t +Ipc::QueueReaders::sharedMemorySize() const +{ + return SharedMemorySize(theCapacity); +} + +size_t +Ipc::QueueReaders::SharedMemorySize(const int capacity) +{ + return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; +} + + +// OneToOneUniQueue + +Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity): + theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), + theCapacity(aCapacity) +{ + Must(theMaxItemSize > 0); + Must(theCapacity > 0); +} + +int +Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) +{ + assert(maxItemSize > 0); + size -= sizeof(OneToOneUniQueue); + return size >= 0 ? size / maxItemSize : 0; +} + +int +Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) +{ + assert(size >= 0); + return sizeof(OneToOneUniQueue) + maxItemSize * size; +} + + +/* OneToOneUniQueues */ + +Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity) +{ + Must(theCapacity > 0); + for (int i = 0; i < theCapacity; ++i) + new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity); +} + +size_t +Ipc::OneToOneUniQueues::sharedMemorySize() const +{ + return sizeof(*this) + theCapacity * front().sharedMemorySize(); +} + +size_t +Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity) +{ + const int queueSize = + OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity); + return sizeof(OneToOneUniQueues) + queueSize * capacity; +} + +const Ipc::OneToOneUniQueue & +Ipc::OneToOneUniQueues::operator [](const int index) const +{ + Must(0 <= index && index < theCapacity); + const size_t queueSize = index ? front().sharedMemorySize() : 0; + const char *const queue = + reinterpret_cast(this) + sizeof(*this) + index * queueSize; + return *reinterpret_cast(queue); +} + + +// FewToFewBiQueue + +Ipc::FewToFewBiQueue::Owner * +Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) +{ + return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); +} + +Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): + metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), + queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), + readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), + theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId), + theLastPopProcessId(readers->theCapacity) +{ + Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); + Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); + + const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); + debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id); +} + +bool +Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const +{ + switch (group) { + case groupA: + return metadata->theGroupAIdOffset <= processId && + processId < metadata->theGroupAIdOffset + metadata->theGroupASize; + case groupB: + return metadata->theGroupBIdOffset <= processId && + processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; + } + return false; +} + +Ipc::OneToOneUniQueue & +Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) +{ + Must(fromGroup != toGroup); + Must(validProcessId(fromGroup, fromProcessId)); + Must(validProcessId(toGroup, toProcessId)); + int index1; + int index2; + int offset; + if (fromGroup == groupA) { + index1 = fromProcessId - metadata->theGroupAIdOffset; + index2 = toProcessId - metadata->theGroupBIdOffset; + offset = 0; + } else { + index1 = toProcessId - metadata->theGroupAIdOffset; + index2 = fromProcessId - metadata->theGroupBIdOffset; + offset = metadata->theGroupASize * metadata->theGroupBSize; + } + const int index = offset + index1 * metadata->theGroupBSize + index2; + return (*queues)[index]; +} + +Ipc::QueueReader & +Ipc::FewToFewBiQueue::reader(const Group group, const int processId) +{ + Must(validProcessId(group, processId)); + const int index = group == groupA ? + processId - metadata->theGroupAIdOffset : + metadata->theGroupASize + processId - metadata->theGroupBIdOffset; + return readers->theReaders[index]; +} + +void +Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) +{ + QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); + debugs(54, 7, HERE << "reader: " << localReader.id); + + Must(validProcessId(remoteGroup(), remoteProcessId)); + localReader.clearSignal(); + + // we got a hint; we could reposition iteration to try popping from the + // remoteProcessId queue first; but it does not seem to help much and might + // introduce some bias so we do not do that for now: + // theLastPopProcessId = remoteProcessId; +} + +Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): + theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), + theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) +{ + Must(theGroupASize > 0); + Must(theGroupBSize > 0); +} + +Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): + metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), + queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), + readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) +{ +} + +Ipc::FewToFewBiQueue::Owner::~Owner() +{ + delete metadataOwner; + delete queuesOwner; + delete readersOwner; +} === added file 'src/ipc/Queue.h' --- src/ipc/Queue.h 1970-01-01 00:00:00 +0000 +++ src/ipc/Queue.h 2011-04-26 20:39:59 +0000 @@ -0,0 +1,300 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_QUEUE_H +#define SQUID_IPC_QUEUE_H + +#include "Array.h" +#include "base/InstanceId.h" +#include "ipc/AtomicWord.h" +#include "ipc/mem/Pointer.h" +#include "util.h" + +class String; + +namespace Ipc { + +/// State of the reading end of a queue (i.e., of the code calling pop()). +/// Multiple queues attached to one reader share this state. +class QueueReader { +public: + QueueReader(); // the initial state is "blocked without a signal" + + /// whether the reader is waiting for a notification signal + bool blocked() const { return popBlocked == 1; } + + /// marks the reader as blocked, waiting for a notification signal + void block() { popBlocked.swap_if(0, 1); } + + /// removes the block() effects + void unblock() { popBlocked.swap_if(1, 0); } + + /// if reader is blocked and not notified, marks the notification signal + /// as sent and not received, returning true; otherwise, returns false + bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); } + + /// marks sent reader notification as received (also removes pop blocking) + void clearSignal() { unblock(); popSignal.swap_if(1,0); } + +private: + AtomicWord popBlocked; ///< whether the reader is blocked on pop() + AtomicWord popSignal; ///< whether writer has sent and reader has not received notification + +public: + /// unique ID for debugging which reader is used (works across processes) + const InstanceId id; +}; + +/// shared array of QueueReaders +class QueueReaders { +public: + QueueReaders(const int aCapacity); + size_t sharedMemorySize() const; + static size_t SharedMemorySize(const int capacity); + + const int theCapacity; /// number of readers + QueueReader theReaders[]; /// readers +}; + +/** + * Lockless fixed-capacity queue for a single writer and a single reader. + * + * If the queue is empty, the reader is considered "blocked" and needs + * an out-of-band notification message to notice the next pushed item. + * + * Current implementation assumes that the writer cannot get blocked: if the + * queue is full, the writer will just not push and come back later (with a + * different value). We can add support for blocked writers if needed. + */ +class OneToOneUniQueue { +public: + // pop() and push() exceptions; TODO: use TextException instead + class Full {}; + class ItemTooLarge {}; + + OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); + + unsigned int maxItemSize() const { return theMaxItemSize; } + int size() const { return theSize; } + int capacity() const { return theCapacity; } + int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } + + bool empty() const { return !theSize; } + bool full() const { return theSize == theCapacity; } + + static int Bytes2Items(const unsigned int maxItemSize, int size); + static int Items2Bytes(const unsigned int maxItemSize, const int size); + + /// returns true iff the value was set; [un]blocks the reader as needed + template bool pop(Value &value, QueueReader *const reader = NULL); + + /// returns true iff the caller must notify the reader of the pushed item + template bool push(const Value &value, QueueReader *const reader = NULL); + +private: + + unsigned int theIn; ///< input index, used only in push() + unsigned int theOut; ///< output index, used only in pop() + + AtomicWord theSize; ///< number of items in the queue + const unsigned int theMaxItemSize; ///< maximum item size + const int theCapacity; ///< maximum number of items, i.e. theBuffer size + + char theBuffer[]; +}; + +/// shared array of OneToOneUniQueues +class OneToOneUniQueues { +public: + OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); + + size_t sharedMemorySize() const; + static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); + + const OneToOneUniQueue &operator [](const int index) const; + inline OneToOneUniQueue &operator [](const int index); + +private: + inline const OneToOneUniQueue &front() const; + +public: + const int theCapacity; /// number of OneToOneUniQueues +}; + +/** + * Lockless fixed-capacity bidirectional queue for a limited number + * processes. Allows communication between two groups of processes: + * any process in one group may send data to and receive from any + * process in another group, but processes in the same group can not + * communicate. Process in each group has a unique integer ID in + * [groupIdOffset, groupIdOffset + groupSize) range. + */ +class FewToFewBiQueue { +public: + typedef OneToOneUniQueue::Full Full; + typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; + +private: + /// Shared metadata for FewToFewBiQueue + struct Metadata { + Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); + size_t sharedMemorySize() const { return sizeof(*this); } + static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } + + const int theGroupASize; + const int theGroupAIdOffset; + const int theGroupBSize; + const int theGroupBIdOffset; + }; + +public: + class Owner { + public: + Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); + ~Owner(); + + private: + Mem::Owner *const metadataOwner; + Mem::Owner *const queuesOwner; + Mem::Owner *const readersOwner; + }; + + static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); + + enum Group { groupA = 0, groupB = 1 }; + FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); + + Group localGroup() const { return theLocalGroup; } + Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } + + /// clears the reader notification received by the local process from the remote process + void clearReaderSignal(const int remoteProcessId); + + /// picks a process and calls OneToOneUniQueue::pop() using its queue + template bool pop(int &remoteProcessId, Value &value); + + /// calls OneToOneUniQueue::push() using the given process queue + template bool push(const int remoteProcessId, const Value &value); + +private: + bool validProcessId(const Group group, const int processId) const; + OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); + QueueReader &reader(const Group group, const int processId); + int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } + int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; } + +private: + const Mem::Pointer metadata; ///< shared metadata + const Mem::Pointer queues; ///< unidirection one-to-one queues + const Mem::Pointer readers; ///< readers array + + const Group theLocalGroup; ///< group of this queue + const int theLocalProcessId; ///< process ID of this queue + int theLastPopProcessId; ///< the ID of the last process we tried to pop() from +}; + + +// OneToOneUniQueue + +template +bool +OneToOneUniQueue::pop(Value &value, QueueReader *const reader) +{ + if (sizeof(value) > theMaxItemSize) + throw ItemTooLarge(); + + // A writer might push between the empty test and block() below, so we do + // not return false right after calling block(), but test again. + if (empty()) { + if (!reader) + return false; + + reader->block(); + // A writer might push between the empty test and block() below, + // so we must test again as such a writer will not signal us. + if (empty()) + return false; + } + + if (reader) + reader->unblock(); + + const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; + memcpy(&value, theBuffer + pos, sizeof(value)); + --theSize; + + return true; +} + +template +bool +OneToOneUniQueue::push(const Value &value, QueueReader *const reader) +{ + if (sizeof(value) > theMaxItemSize) + throw ItemTooLarge(); + + if (full()) + throw Full(); + + const bool wasEmpty = empty(); + const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; + memcpy(theBuffer + pos, &value, sizeof(value)); + ++theSize; + + return wasEmpty && (!reader || reader->raiseSignal()); +} + + +// OneToOneUniQueues + +inline OneToOneUniQueue & +OneToOneUniQueues::operator [](const int index) +{ + return const_cast((*const_cast(this))[index]); +} + +inline const OneToOneUniQueue & +OneToOneUniQueues::front() const +{ + const char *const queue = + reinterpret_cast(this) + sizeof(*this); + return *reinterpret_cast(queue); +} + + +// FewToFewBiQueue + +template +bool +FewToFewBiQueue::pop(int &remoteProcessId, Value &value) +{ + // iterate all remote group processes, starting after the one we visited last + QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); + for (int i = 0; i < remoteGroupSize(); ++i) { + if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize()) + theLastPopProcessId = remoteGroupIdOffset(); + OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId); + if (queue.pop(value, &localReader)) { + remoteProcessId = theLastPopProcessId; + debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); + return true; + } + } + return false; // no process had anything to pop +} + +template +bool +FewToFewBiQueue::push(const int remoteProcessId, const Value &value) +{ + OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); + QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId); + debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); + return remoteQueue.push(value, &remoteReader); +} + +} // namespace Ipc + +#endif // SQUID_IPC_QUEUE_H === added file 'src/ipc/ReadWriteLock.cc' --- src/ipc/ReadWriteLock.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/ReadWriteLock.cc 2011-04-09 04:24:06 +0000 @@ -0,0 +1,97 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + */ + +#include "squid.h" + +#include "Store.h" +#include "ipc/ReadWriteLock.h" + +bool +Ipc::ReadWriteLock::lockShared() +{ + ++readers; // this locks "new" writers out + if (!writers) // there are no old writers + return true; + --readers; + return false; +} + +bool +Ipc::ReadWriteLock::lockExclusive() +{ + if (!writers++) { // we are the first writer + this locks "new" readers out + if (!readers) // there are no old readers + return true; + } + --writers; + return false; +} + +void +Ipc::ReadWriteLock::unlockShared() +{ + assert(readers-- > 0); +} + +void +Ipc::ReadWriteLock::unlockExclusive() +{ + assert(writers-- > 0); +} + +void +Ipc::ReadWriteLock::switchExclusiveToShared() +{ + ++readers; // must be done before we release exclusive control + unlockExclusive(); +} + +void +Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const +{ + if (readers) { + ++stats.readable; + stats.readers += readers; + } else if (writers) { + ++stats.writeable; + stats.writers += writers; + } else { + ++stats.idle; + } + ++stats.count; +} + + +/* Ipc::ReadWriteLockStats */ + +Ipc::ReadWriteLockStats::ReadWriteLockStats() +{ + memset(this, 0, sizeof(*this)); +} + +void +Ipc::ReadWriteLockStats::dump(StoreEntry &e) const +{ + storeAppendPrintf(&e, "Available locks: %9d\n", count); + + if (!count) + return; + + storeAppendPrintf(&e, "Reading: %9d %6.2f%%\n", + readable, (100.0 * readable / count)); + storeAppendPrintf(&e, "Writing: %9d %6.2f%%\n", + writeable, (100.0 * writeable / count)); + storeAppendPrintf(&e, "Idle: %9d %6.2f%%\n", + idle, (100.0 * idle / count)); + + if (readers || writers) { + const int locked = readers + writers; + storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n", + readers, (100.0 * readers / locked)); + storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n", + writers, (100.0 * writers / locked)); + } +} === added file 'src/ipc/ReadWriteLock.h' --- src/ipc/ReadWriteLock.h 1970-01-01 00:00:00 +0000 +++ src/ipc/ReadWriteLock.h 2011-04-09 04:24:06 +0000 @@ -0,0 +1,49 @@ +#ifndef SQUID_IPC_READ_WRITE_LOCK_H +#define SQUID_IPC_READ_WRITE_LOCK_H + +#include "ipc/AtomicWord.h" + +class StoreEntry; + +namespace Ipc { + +class ReadWriteLockStats; + +/// an atomic readers-writer or shared-exclusive lock suitable for maps/tables +class ReadWriteLock { +public: + // default constructor is OK because of shared memory zero-initialization + + bool lockShared(); ///< lock for reading or return false + bool lockExclusive(); ///< lock for modification or return false + void unlockShared(); ///< undo successful sharedLock() + void unlockExclusive(); ///< undo successful exclusiveLock() + void switchExclusiveToShared(); ///< stop writing, start reading + + /// adds approximate current stats to the supplied ones + void updateStats(ReadWriteLockStats &stats) const; + +public: + mutable AtomicWord readers; ///< number of users trying to read + AtomicWord writers; ///< number of writers trying to modify protected data +}; + + +/// approximate stats of a set of ReadWriteLocks +class ReadWriteLockStats { +public: + ReadWriteLockStats(); + + void dump(StoreEntry &e) const; + + int count; ///< the total number of locks + int readable; ///< number of locks locked for reading + int writeable; ///< number of locks locked for writing + int idle; ///< number of unlocked locks + int readers; ///< sum of lock.readers + int writers; ///< sum of lock.writers +}; + +} // namespace Ipc + +#endif /* SQUID_IPC_READ_WRITE_LOCK_H */ === added file 'src/ipc/StoreMap.cc' --- src/ipc/StoreMap.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/StoreMap.cc 2011-04-25 15:14:10 +0000 @@ -0,0 +1,325 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + */ + +#include "squid.h" + +#include "Store.h" +#include "ipc/StoreMap.h" + +Ipc::StoreMap::Owner * +Ipc::StoreMap::Init(const char *const path, const int limit, const size_t extrasSize) +{ + assert(limit > 0); // we should not be created otherwise + assert(extrasSize >= 0); + Owner *const owner = shm_new(Shared)(path, limit, extrasSize); + debugs(54, 5, HERE << "new map [" << path << "] created: " << limit); + return owner; +} + +Ipc::StoreMap::Owner * +Ipc::StoreMap::Init(const char *const path, const int limit) +{ + return Init(path, limit, 0); +} + +Ipc::StoreMap::StoreMap(const char *const aPath): cleaner(NULL), path(aPath), + shared(shm_old(Shared)(aPath)) +{ + assert(shared->limit > 0); // we should not be created otherwise + debugs(54, 5, HERE << "attached map [" << path << "] created: " << + shared->limit); +} + +Ipc::StoreMap::Slot * +Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno) +{ + debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key) + << " for writing in map [" << path << ']'); + const int idx = slotIndexByKey(key); + + Slot &s = shared->slots[idx]; + ReadWriteLock &lock = s.lock; + + if (lock.lockExclusive()) { + assert(s.state != Slot::Writeable); // until we start breaking locks + + // free if the entry was used, keeping the entry locked + if (s.waitingToBeFreed == true || s.state == Slot::Readable) + freeLocked(s, true); + + assert(s.state == Slot::Empty); + ++shared->count; + s.state = Slot::Writeable; + fileno = idx; + //s.setKey(key); // XXX: the caller should do that + debugs(54, 5, HERE << " opened slot at " << idx << + " for writing in map [" << path << ']'); + return &s; // and keep the entry locked + } + + debugs(54, 5, HERE << " failed to open slot at " << idx << + " for writing in map [" << path << ']'); + return NULL; +} + +void +Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading) +{ + debugs(54, 5, HERE << " closing slot at " << fileno << " for writing and " + "openning for reading in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + assert(s.state == Slot::Writeable); + s.state = Slot::Readable; + if (lockForReading) + s.lock.switchExclusiveToShared(); + else + s.lock.unlockExclusive(); +} + +/// terminate writing the entry, freeing its slot for others to use +void +Ipc::StoreMap::abortWriting(const sfileno fileno) +{ + debugs(54, 5, HERE << " abort writing slot at " << fileno << + " in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + assert(s.state == Slot::Writeable); + freeLocked(s, false); +} + +void +Ipc::StoreMap::abortIo(const sfileno fileno) +{ + debugs(54, 5, HERE << " abort I/O for slot at " << fileno << + " in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + + // The caller is a lock holder. Thus, if we are Writeable, then the + // caller must be the writer; otherwise the caller must be the reader. + if (s.state == Slot::Writeable) + abortWriting(fileno); + else + closeForReading(fileno); +} + +const Ipc::StoreMap::Slot * +Ipc::StoreMap::peekAtReader(const sfileno fileno) const +{ + assert(valid(fileno)); + const Slot &s = shared->slots[fileno]; + switch (s.state) { + case Slot::Readable: + return &s; // immediate access by lock holder so no locking + case Slot::Writeable: + return NULL; // cannot read the slot when it is being written + case Slot::Empty: + assert(false); // must be locked for reading or writing + } + assert(false); // not reachable + return NULL; +} + +void +Ipc::StoreMap::free(const sfileno fileno) +{ + debugs(54, 5, HERE << " marking slot at " << fileno << " to be freed in" + " map [" << path << ']'); + + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + + if (s.lock.lockExclusive()) + freeLocked(s, false); + else + s.waitingToBeFreed = true; // mark to free it later +} + +const Ipc::StoreMap::Slot * +Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno) +{ + debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key) + << " for reading in map [" << path << ']'); + const int idx = slotIndexByKey(key); + if (const Slot *slot = openForReadingAt(idx)) { + if (slot->sameKey(key)) { + fileno = idx; + debugs(54, 5, HERE << " opened slot at " << fileno << " for key " + << storeKeyText(key) << " for reading in map [" << path << + ']'); + return slot; // locked for reading + } + slot->lock.unlockShared(); + } + debugs(54, 5, HERE << " failed to open slot for key " << storeKeyText(key) + << " for reading in map [" << path << ']'); + return NULL; +} + +const Ipc::StoreMap::Slot * +Ipc::StoreMap::openForReadingAt(const sfileno fileno) +{ + debugs(54, 5, HERE << " trying to open slot at " << fileno << " for " + "reading in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + + if (!s.lock.lockShared()) { + debugs(54, 5, HERE << " failed to lock slot at " << fileno << " for " + "reading in map [" << path << ']'); + return NULL; + } + + if (s.state == Slot::Empty) { + s.lock.unlockShared(); + debugs(54, 7, HERE << " empty slot at " << fileno << " for " + "reading in map [" << path << ']'); + return NULL; + } + + if (s.waitingToBeFreed) { + s.lock.unlockShared(); + debugs(54, 7, HERE << " dirty slot at " << fileno << " for " + "reading in map [" << path << ']'); + return NULL; + } + + // cannot be Writing here if we got shared lock and checked Empty above + assert(s.state == Slot::Readable); + debugs(54, 5, HERE << " opened slot at " << fileno << " for reading in" + " map [" << path << ']'); + return &s; +} + +void +Ipc::StoreMap::closeForReading(const sfileno fileno) +{ + debugs(54, 5, HERE << " closing slot at " << fileno << " for reading in " + "map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + assert(s.state == Slot::Readable); + s.lock.unlockShared(); +} + +int +Ipc::StoreMap::entryLimit() const +{ + return shared->limit; +} + +int +Ipc::StoreMap::entryCount() const +{ + return shared->count; +} + +bool +Ipc::StoreMap::full() const +{ + return entryCount() >= entryLimit(); +} + +void +Ipc::StoreMap::updateStats(ReadWriteLockStats &stats) const +{ + for (int i = 0; i < shared->limit; ++i) + shared->slots[i].lock.updateStats(stats); +} + +bool +Ipc::StoreMap::valid(const int pos) const +{ + return 0 <= pos && pos < entryLimit(); +} + +int +Ipc::StoreMap::slotIndexByKey(const cache_key *const key) const +{ + const uint64_t *const k = reinterpret_cast(key); + // TODO: use a better hash function + return (k[0] + k[1]) % shared->limit; +} + +Ipc::StoreMap::Slot & +Ipc::StoreMap::slotByKey(const cache_key *const key) +{ + return shared->slots[slotIndexByKey(key)]; +} + +/// unconditionally frees the already exclusively locked slot and releases lock +void +Ipc::StoreMap::freeLocked(Slot &s, bool keepLocked) +{ + if (s.state == Slot::Readable && cleaner) + cleaner->cleanReadable(&s - shared->slots); + + s.waitingToBeFreed = false; + s.state = Slot::Empty; + if (!keepLocked) + s.lock.unlockExclusive(); + --shared->count; + debugs(54, 5, HERE << " freed slot at " << (&s - shared->slots) << + " in map [" << path << ']'); +} + + +/* Ipc::StoreMapSlot */ + +Ipc::StoreMapSlot::StoreMapSlot(): state(Empty) +{ + xmemset(&key, 0, sizeof(key)); + xmemset(&basics, 0, sizeof(basics)); +} + +void +Ipc::StoreMapSlot::setKey(const cache_key *const aKey) +{ + memcpy(key, aKey, sizeof(key)); +} + +bool +Ipc::StoreMapSlot::sameKey(const cache_key *const aKey) const +{ + const uint64_t *const k = reinterpret_cast(aKey); + return k[0] == key[0] && k[1] == key[1]; +} + +void +Ipc::StoreMapSlot::set(const StoreEntry &from) +{ + memcpy(key, from.key, sizeof(key)); + // XXX: header = aHeader; + basics.timestamp = from.timestamp; + basics.lastref = from.lastref; + basics.expires = from.expires; + basics.lastmod = from.lastmod; + basics.swap_file_sz = from.swap_file_sz; + basics.refcount = from.refcount; + basics.flags = from.flags; +} + +/* Ipc::StoreMap::Shared */ + +Ipc::StoreMap::Shared::Shared(const int aLimit, const size_t anExtrasSize): + limit(aLimit), extrasSize(anExtrasSize), count(0) +{ +} + +size_t +Ipc::StoreMap::Shared::sharedMemorySize() const +{ + return SharedMemorySize(limit, extrasSize); +} + +size_t +Ipc::StoreMap::Shared::SharedMemorySize(const int limit, const size_t extrasSize) +{ + return sizeof(Shared) + limit * (sizeof(Slot) + extrasSize); +} + === added file 'src/ipc/StoreMap.h' --- src/ipc/StoreMap.h 1970-01-01 00:00:00 +0000 +++ src/ipc/StoreMap.h 2011-04-25 15:14:10 +0000 @@ -0,0 +1,198 @@ +#ifndef SQUID_IPC_STORE_MAP_H +#define SQUID_IPC_STORE_MAP_H + +#include "ipc/ReadWriteLock.h" +#include "ipc/mem/Pointer.h" +#include "typedefs.h" + +namespace Ipc { + +/// a StoreMap element, holding basic shareable StoreEntry info +class StoreMapSlot { +public: + StoreMapSlot(); + + /// store StoreEntry key and basics + void set(const StoreEntry &anEntry); + + void setKey(const cache_key *const aKey); + bool sameKey(const cache_key *const aKey) const; + +public: + mutable ReadWriteLock lock; ///< protects slot data below + AtomicWordT waitingToBeFreed; ///< may be accessed w/o a lock + + uint64_t key[2]; ///< StoreEntry key + + // STORE_META_STD TLV field from StoreEntry + struct Basics { + time_t timestamp; + time_t lastref; + time_t expires; + time_t lastmod; + uint64_t swap_file_sz; + u_short refcount; + u_short flags; + } basics; + + /// possible persistent states + typedef enum { + Empty, ///< ready for writing, with nothing of value + Writeable, ///< transitions from Empty to Readable + Readable, ///< ready for reading + } State; + State state; ///< current state +}; + +class StoreMapCleaner; + +/// map of StoreMapSlots indexed by their keys, with read/write slot locking +/// kids extend to store custom data +class StoreMap +{ +public: + typedef StoreMapSlot Slot; + +private: + struct Shared + { + Shared(const int aLimit, const size_t anExtrasSize); + size_t sharedMemorySize() const; + static size_t SharedMemorySize(const int limit, const size_t anExtrasSize); + + const int limit; ///< maximum number of map slots + const size_t extrasSize; ///< size of slot extra data + AtomicWord count; ///< current number of map slots + Slot slots[]; ///< slots storage + }; + +public: + typedef Mem::Owner Owner; + + /// initialize shared memory + static Owner *Init(const char *const path, const int limit); + + StoreMap(const char *const aPath); + + /// finds, reservers space for writing a new entry or returns nil + Slot *openForWriting(const cache_key *const key, sfileno &fileno); + /// successfully finish writing the entry + void closeForWriting(const sfileno fileno, bool lockForReading = false); + + /// only works on locked entries; returns nil unless the slot is readable + const Slot *peekAtReader(const sfileno fileno) const; + + /// mark the slot as waiting to be freed and, if possible, free it + void free(const sfileno fileno); + + /// open slot for reading, increments read level + const Slot *openForReading(const cache_key *const key, sfileno &fileno); + /// open slot for reading, increments read level + const Slot *openForReadingAt(const sfileno fileno); + /// close slot after reading, decrements read level + void closeForReading(const sfileno fileno); + + /// called by lock holder to terminate either slot writing or reading + void abortIo(const sfileno fileno); + + bool full() const; ///< there are no empty slots left + bool valid(const int n) const; ///< whether n is a valid slot coordinate + int entryCount() const; ///< number of used slots + int entryLimit() const; ///< maximum number of slots that can be used + + /// adds approximate current stats to the supplied ones + void updateStats(ReadWriteLockStats &stats) const; + + StoreMapCleaner *cleaner; ///< notified before a readable entry is freed + +protected: + static Owner *Init(const char *const path, const int limit, const size_t extrasSize); + + const String path; ///< cache_dir path, used for logging + Mem::Pointer shared; + +private: + int slotIndexByKey(const cache_key *const key) const; + Slot &slotByKey(const cache_key *const key); + + Slot *openForReading(Slot &s); + void abortWriting(const sfileno fileno); + void freeIfNeeded(Slot &s); + void freeLocked(Slot &s, bool keepLocked); +}; + +/// StoreMap with extra slot data +/// Note: ExtrasT must be POD, it is initialized with zeroes, no +/// constructors or destructors are called +template +class StoreMapWithExtras: public StoreMap +{ +public: + typedef ExtrasT Extras; + + /// initialize shared memory + static Owner *Init(const char *const path, const int limit); + + StoreMapWithExtras(const char *const path); + + /// write access to the extras; call openForWriting() first! + ExtrasT &extras(const sfileno fileno); + /// read-only access to the extras; call openForReading() first! + const ExtrasT &extras(const sfileno fileno) const; + +protected: + + ExtrasT *sharedExtras; ///< pointer to extras in shared memory +}; + +/// API for adjusting external state when dirty map slot is being freed +class StoreMapCleaner +{ +public: + virtual ~StoreMapCleaner() {} + + /// adjust slot-linked state before a locked Readable slot is erased + virtual void cleanReadable(const sfileno fileno) = 0; +}; + +// StoreMapWithExtras implementation + +template +StoreMap::Owner * +StoreMapWithExtras::Init(const char *const path, const int limit) +{ + return StoreMap::Init(path, limit, sizeof(Extras)); +} + +template +StoreMapWithExtras::StoreMapWithExtras(const char *const path): + StoreMap(path) +{ + const size_t sharedSizeWithoutExtras = + Shared::SharedMemorySize(entryLimit(), 0); + sharedExtras = reinterpret_cast(reinterpret_cast(shared.getRaw()) + sharedSizeWithoutExtras); +} + +template +ExtrasT & +StoreMapWithExtras::extras(const sfileno fileno) +{ + return const_cast(const_cast(this)->extras(fileno)); +} + +template +const ExtrasT & +StoreMapWithExtras::extras(const sfileno fileno) const +{ + assert(sharedExtras); + assert(valid(fileno)); + return sharedExtras[fileno]; +} + + +} // namespace Ipc + +// We do not reuse struct _fileMap because we cannot control its size, +// resulting in sfilenos that are pointing beyond the database. + +#endif /* SQUID_IPC_STORE_MAP_H */ === modified file 'src/ipc/Strand.cc' --- src/ipc/Strand.cc 2011-02-03 07:39:31 +0000 +++ src/ipc/Strand.cc 2011-04-14 16:58:28 +0000 @@ -11,10 +11,13 @@ #include "ipc/StrandCoord.h" #include "ipc/Messages.h" #include "ipc/SharedListen.h" +#include "ipc/StrandSearch.h" #include "ipc/Kids.h" #include "mgr/Request.h" #include "mgr/Response.h" #include "mgr/Forwarder.h" +#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */ +#include "SwapDir.h" /* XXX: scope boundary violation */ #include "CacheManager.h" #if SQUID_SNMP #include "snmp/Forwarder.h" @@ -41,8 +44,10 @@ { debugs(54, 6, HERE); Must(!isRegistered); + + HereIamMessage ann(StrandCoord(KidIdentifier, getpid())); TypedMsgHdr message; - StrandCoord(KidIdentifier, getpid()).pack(message); + ann.pack(message); === added directory 'src/ipc/mem' === added file 'src/ipc/mem/Page.cc' --- src/ipc/mem/Page.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/Page.cc 2011-04-06 13:23:03 +0000 @@ -0,0 +1,19 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "ipc/mem/Page.h" + +#if HAVE_IOSTREAM +#include +#endif + + +std::ostream &Ipc::Mem::operator <<(std::ostream &os, const PageId &page) +{ + return os << "sh_page" << page.pool << '.' << page.number; +} === added file 'src/ipc/mem/Page.h' --- src/ipc/mem/Page.h 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/Page.h 2011-04-25 19:24:35 +0000 @@ -0,0 +1,36 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_MEM_PAGE_H +#define SQUID_IPC_MEM_PAGE_H + +#if HAVE_IOSFWD +#include +#endif + +namespace Ipc { + +namespace Mem { + +/// Shared memory page identifier, address, or handler +class PageId { +public: + PageId(): pool(0), number(0) {} + + operator bool() const { return pool && number; } + + uint32_t pool; ///< page pool ID within Squid + // uint32_t segment; ///< memory segment ID within the pool; unused for now + uint32_t number; ///< page number within the segment +}; + +/// writes page address (e.g., "sh_page5.3"), for debugging +std::ostream &operator <<(std::ostream &os, const PageId &page); + +} // namespace Mem + +} // namespace Ipc + +#endif // SQUID_IPC_MEM_PAGE_H === added file 'src/ipc/mem/PagePool.cc' --- src/ipc/mem/PagePool.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/PagePool.cc 2011-04-25 19:24:35 +0000 @@ -0,0 +1,38 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "ipc/mem/Page.h" +#include "ipc/mem/PagePool.h" + + +// Ipc::Mem::PagePool + +Ipc::Mem::PagePool::Owner * +Ipc::Mem::PagePool::Init(const char *const id, const unsigned int capacity, const size_t pageSize) +{ + static uint32_t LastPagePoolId = 0; + if (++LastPagePoolId == 0) + ++LastPagePoolId; // skip zero pool id + return shm_new(PageStack)(id, LastPagePoolId, capacity, pageSize); +} + +Ipc::Mem::PagePool::PagePool(const char *const id): + pageIndex(shm_old(PageStack)(id)) +{ + const size_t pagesDataOffset = + pageIndex->sharedMemorySize() - capacity() * pageSize(); + theBuf = reinterpret_cast(pageIndex.getRaw()) + pagesDataOffset; +} + +char * +Ipc::Mem::PagePool::pagePointer(const PageId &page) +{ + Must(pageIndex->pageIdIsValid(page)); + return theBuf + pageSize() * (page.number - 1); +} === added file 'src/ipc/mem/PagePool.h' --- src/ipc/mem/PagePool.h 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/PagePool.h 2011-04-25 19:24:35 +0000 @@ -0,0 +1,48 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_MEM_PAGE_POOL_H +#define SQUID_IPC_MEM_PAGE_POOL_H + +#include "ipc/mem/PageStack.h" +#include "ipc/mem/Pointer.h" + +namespace Ipc { + +namespace Mem { + +/// Atomic container of shared memory pages. Implemented using a collection of +/// Segments, each with a PageStack index of free pages. All pools must be +/// created by a single process. +class PagePool { +public: + typedef Ipc::Mem::Owner Owner; + + static Owner *Init(const char *const id, const unsigned int capacity, const size_t pageSize); + + PagePool(const char *const id); + + unsigned int capacity() const { return pageIndex->capacity(); } + size_t pageSize() const { return pageIndex->pageSize(); } + /// lower bound for the number of free pages + unsigned int size() const { return pageIndex->size(); } + + /// sets page ID and returns true unless no free pages are found + bool get(PageId &page) { return pageIndex->pop(page); } + /// makes identified page available as a free page to future get() callers + void put(PageId &page) { return pageIndex->push(page); } + /// converts page handler into a temporary writeable shared memory pointer + char *pagePointer(const PageId &page); + +private: + Ipc::Mem::Pointer pageIndex; ///< free pages index + char *theBuf; ///< pages storage +}; + +} // namespace Mem + +} // namespace Ipc + +#endif // SQUID_IPC_MEM_PAGE_POOL_H === added file 'src/ipc/mem/PageStack.cc' --- src/ipc/mem/PageStack.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/PageStack.cc 2011-04-25 19:24:35 +0000 @@ -0,0 +1,117 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" + +#include "base/TextException.h" +#include "ipc/mem/Page.h" +#include "ipc/mem/PageStack.h" + +/// used to mark a stack slot available for storing free page offsets +const Ipc::Mem::PageStack::Value Writable = 0; + + +Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize): + thePoolId(aPoolId), theCapacity(aCapacity), thePageSize(aPageSize), + theSize(theCapacity), + theLastReadable(prev(theSize)), theFirstWritable(next(theLastReadable)) +{ + // initially, all pages are free + for (Offset i = 0; i < theSize; ++i) + theItems[i] = i + 1; // skip page number zero to keep numbers positive +} + +/* + * TODO: We currently rely on the theLastReadable hint during each + * loop iteration. We could also use hint just for the start position: + * (const Offset start = theLastReadable) and then scan the stack + * sequentially regardless of theLastReadable changes by others. Which + * approach is better? Same for push(). + */ +bool +Ipc::Mem::PageStack::pop(PageId &page) +{ + Must(!page); + + // we may fail to dequeue, but be conservative to prevent long searches + --theSize; + + // find a Readable slot, starting with theLastReadable and going left + while (theSize >= 0) { + const Offset idx = theLastReadable; + // mark the slot at ids Writable while extracting its current value + const Value value = theItems[idx].fetchAndAnd(0); // works if Writable is 0 + const bool popped = value != Writable; + // theItems[idx] is probably not Readable [any more] + + // Whether we popped a Readable value or not, we should try going left + // to maintain the index (and make progress). + // We may fail if others already updated the index, but that is OK. + theLastReadable.swap_if(idx, prev(idx)); // may fail or lie + + if (popped) { + // the slot we emptied may already be filled, but that is OK + theFirstWritable = idx; // may lie + page.pool = thePoolId; + page.number = value; + return true; + } + // TODO: report suspiciously long loops + } + + ++theSize; + return false; +} + +void +Ipc::Mem::PageStack::push(PageId &page) +{ + if (!page) + return; + + Must(pageIdIsValid(page)); + // find a Writable slot, starting with theFirstWritable and going right + while (theSize < theCapacity) { + const Offset idx = theFirstWritable; + const bool pushed = theItems[idx].swap_if(Writable, page.number); + // theItems[idx] is probably not Writable [any more]; + + // Whether we pushed the page number or not, we should try going right + // to maintain the index (and make progress). + // We may fail if others already updated the index, but that is OK. + theFirstWritable.swap_if(idx, next(idx)); // may fail or lie + + if (pushed) { + // the enqueued value may already by gone, but that is OK + theLastReadable = idx; // may lie + ++theSize; + page = PageId(); + return; + } + // TODO: report suspiciously long loops + } + Must(false); // the number of pages cannot exceed theCapacity +} + +bool +Ipc::Mem::PageStack::pageIdIsValid(const PageId &page) const +{ + return page.pool == thePoolId && page.number != Writable && + page.number <= capacity(); +} + +size_t +Ipc::Mem::PageStack::sharedMemorySize() const +{ + return SharedMemorySize(thePoolId, theCapacity, thePageSize); +} + +size_t +Ipc::Mem::PageStack::SharedMemorySize(const uint32_t, const unsigned int capacity, const size_t pageSize) +{ + return sizeof(PageStack) + capacity * (sizeof(Item) + pageSize); +} === added file 'src/ipc/mem/PageStack.h' --- src/ipc/mem/PageStack.h 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/PageStack.h 2011-04-25 15:14:10 +0000 @@ -0,0 +1,69 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_MEM_PAGE_STACK_H +#define SQUID_IPC_MEM_PAGE_STACK_H + +#include "ipc/AtomicWord.h" + +namespace Ipc { + +namespace Mem { + +class PageId; + +/// Atomic container of "free" page numbers inside a single SharedMemory space. +/// Assumptions: all page numbers are unique, positive, have an known maximum, +/// and can be temporary unavailable as long as they are never trully lost. +class PageStack { +public: + typedef uint32_t Value; ///< stack item type (a free page number) + + PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize); + + unsigned int capacity() const { return theCapacity; } + size_t pageSize() const { return thePageSize; } + /// lower bound for the number of free pages + unsigned int size() const { return max(0, theSize.get()); } + + /// sets value and returns true unless no free page numbers are found + bool pop(PageId &page); + /// makes value available as a free page number to future pop() callers + void push(PageId &page); + + bool pageIdIsValid(const PageId &page) const; + + /// total shared memory size required to share + static size_t SharedMemorySize(const uint32_t aPoolId, const unsigned int capacity, const size_t pageSize); + size_t sharedMemorySize() const; + +private: + /// stack index and size type (may temporary go negative) + typedef int Offset; + + // these help iterate the stack in search of a free spot or a page + Offset next(const Offset idx) const { return (idx + 1) % theCapacity; } + Offset prev(const Offset idx) const { return (theCapacity + idx - 1) % theCapacity; } + + const uint32_t thePoolId; ///< pool ID + const Offset theCapacity; ///< stack capacity, i.e. theItems size + const size_t thePageSize; ///< page size, used to calculate shared memory size + /// lower bound for the number of free pages (may get negative!) + AtomicWordT theSize; + + /// last readable item index; just a hint, not a guarantee + AtomicWordT theLastReadable; + /// first writable item index; just a hint, not a guarantee + AtomicWordT theFirstWritable; + + typedef AtomicWordT Item; + Item theItems[]; ///< page number storage +}; + +} // namespace Mem + +} // namespace Ipc + +#endif // SQUID_IPC_MEM_PAGE_STACK_H === added file 'src/ipc/mem/Pages.cc' --- src/ipc/mem/Pages.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/Pages.cc 2011-04-25 19:24:35 +0000 @@ -0,0 +1,119 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "base/RunnersRegistry.h" +#include "ipc/mem/PagePool.h" +#include "ipc/mem/Pages.h" +#include "structs.h" +#include "SwapDir.h" + +// Uses a single PagePool instance, for now. +// Eventually, we may have pools dedicated to memory caching, disk I/O, etc. + +// TODO: make pool id more unique so it does not conflict with other Squids? +static const char *PagePoolId = "squid-page-pool"; +static Ipc::Mem::PagePool *ThePagePool = 0; + +// TODO: make configurable to avoid waste when mem-cached objects are small/big +size_t +Ipc::Mem::PageSize() { + return 32*1024; +} + +bool +Ipc::Mem::GetPage(PageId &page) +{ + return ThePagePool ? ThePagePool->get(page) : false; +} + +void +Ipc::Mem::PutPage(PageId &page) +{ + Must(ThePagePool); + ThePagePool->put(page); +} + +char * +Ipc::Mem::PagePointer(const PageId &page) +{ + Must(ThePagePool); + return ThePagePool->pagePointer(page); +} + +size_t +Ipc::Mem::Limit() +{ + // TODO: adjust cache_mem description to say that in SMP mode, + // in-transit objects are not allocated using cache_mem. Eventually, + // they should not use cache_mem even if shared memory is not used: + // in-transit objects have nothing to do with caching. + return Config.memMaxSize; +} + +size_t +Ipc::Mem::Level() +{ + return ThePagePool ? + (ThePagePool->capacity() - ThePagePool->size()) * PageSize() : 0; +} + +/// initializes shared memory pages +class SharedMemPagesRr: public RegisteredRunner +{ +public: + /* RegisteredRunner API */ + SharedMemPagesRr(): owner(NULL) {} + virtual void run(const RunnerRegistry &); + virtual ~SharedMemPagesRr(); + +private: + Ipc::Mem::PagePool::Owner *owner; +}; + +RunnerRegistrationEntry(rrAfterConfig, SharedMemPagesRr); + + +void SharedMemPagesRr::run(const RunnerRegistry &) +{ + if (!UsingSmp()) + return; + + // When cache_dirs start using shared memory pages, they would + // need to communicate their needs to us somehow. + if (!Ipc::Mem::Limit()) + return; + + if (Ipc::Mem::Limit() < Ipc::Mem::PageSize()) { + if (IamMasterProcess()) { + debugs(54, DBG_IMPORTANT, "WARNING: mem-cache size is too small (" + << (Ipc::Mem::Limit() / 1024.0) << " KB), should be >= " << + (Ipc::Mem::PageSize() / 1024.0) << " KB"); + } + return; + } + + if (IamMasterProcess()) { + Must(!owner); + const size_t capacity = Ipc::Mem::Limit() / Ipc::Mem::PageSize(); + owner = Ipc::Mem::PagePool::Init(PagePoolId, capacity, Ipc::Mem::PageSize()); + } + + Must(!ThePagePool); + ThePagePool = new Ipc::Mem::PagePool(PagePoolId); +} + +SharedMemPagesRr::~SharedMemPagesRr() +{ + if (!UsingSmp()) + return; + + delete ThePagePool; + ThePagePool = NULL; + delete owner; +} === added file 'src/ipc/mem/Pages.h' --- src/ipc/mem/Pages.h 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/Pages.h 2011-04-25 19:24:35 +0000 @@ -0,0 +1,45 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_MEM_PAGES_H +#define SQUID_IPC_MEM_PAGES_H + +namespace Ipc { + +namespace Mem { + +class PageId; + +/* Single page manipulation */ + +/// sets page ID and returns true unless no free pages are found +bool GetPage(PageId &page); + +/// makes identified page available as a free page to future GetPage() callers +void PutPage(PageId &page); + +/// converts page handler into a temporary writeable shared memory pointer +char *PagePointer(const PageId &page); + + +/* Limits and statistics */ + +/// the total number of shared memory bytes that can be in use at any time +size_t Limit(); + +/// approximate total number of shared memory bytes used now +size_t Level(); + +/// approximate total number of shared memory bytes we can allocate now +inline size_t Available() { return Limit() - Level(); } + +/// returns page size in bytes; all pages are assumed to be the same size +size_t PageSize(); + +} // namespace Mem + +} // namespace Ipc + +#endif // SQUID_IPC_MEM_PAGES_H === added file 'src/ipc/mem/Pointer.h' --- src/ipc/mem/Pointer.h 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/Pointer.h 2011-04-26 20:39:59 +0000 @@ -0,0 +1,178 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_MEM_POINTER_H +#define SQUID_IPC_MEM_POINTER_H + +#include "base/TextException.h" +#include "ipc/mem/Segment.h" +#include "RefCount.h" + +namespace Ipc { + +namespace Mem { + +/// allocates/deallocates shared memory; creates and later destroys a +/// Class object using that memory +template +class Owner +{ +public: + static Owner *New(const char *const id); + template + static Owner *New(const char *const id, const P1 &p1); + template + static Owner *New(const char *const id, const P1 &p1, const P2 &p2); + template + static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3); + template + static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4); + + ~Owner(); + +private: + Owner(const char *const id, const off_t sharedSize); + + // not implemented + Owner(const Owner &); + Owner &operator =(const Owner &); + + Segment theSegment; ///< shared memory segment that holds the object + Class *theObject; ///< shared object +}; + +template class Pointer; + +/// attaches to a shared memory segment with Class object owned by Owner +template +class Object: public RefCountable +{ +public: + static Pointer Old(const char *const id); + +private: + explicit Object(const char *const id); + + // not implemented + Object(const Object &); + Object &operator =(const Object &); + + Segment theSegment; ///< shared memory segment that holds the object + Class *theObject; ///< shared object + + friend class Pointer; +}; + +/// uses a refcounted pointer to Object as a parent, but +/// translates its API to return raw Class pointers +template +class Pointer: public RefCount< Object > +{ +private: + typedef RefCount< Object > Base; + +public: + explicit Pointer(Object *const anObject = NULL): Base(anObject) {} + + Class *operator ->() const { return Base::operator ->()->theObject; } + Class &operator *() const { return *Base::operator *().theObject; } + Class *const getRaw() const { return Base::getRaw()->theObject; } + Class *getRaw() { return Base::getRaw()->theObject; } +}; + +// Owner implementation + +template +Owner::Owner(const char *const id, const off_t sharedSize): + theSegment(id), theObject(NULL) +{ + theSegment.create(sharedSize); + Must(theSegment.mem()); +} + +template +Owner::~Owner() +{ + if (theObject) + theObject->~Class(); +} + +template +Owner * +Owner::New(const char *const id) +{ + const off_t sharedSize = Class::SharedMemorySize(); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class; + return owner; +} + +template template +Owner * +Owner::New(const char *const id, const P1 &p1) +{ + const off_t sharedSize = Class::SharedMemorySize(p1); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1); + return owner; +} + +template template +Owner * +Owner::New(const char *const id, const P1 &p1, const P2 &p2) +{ + const off_t sharedSize = Class::SharedMemorySize(p1, p2); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2); + return owner; +} + +template template +Owner * +Owner::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3) +{ + const off_t sharedSize = Class::SharedMemorySize(p1, p2, p3); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2, p3); + return owner; +} + +template template +Owner * +Owner::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4) +{ + const off_t sharedSize = Class::SharedMemorySize(p1, p2, p3, p4); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2, p3, p4); + return owner; +} + +// Object implementation + +template +Object::Object(const char *const id): theSegment(id) +{ + theSegment.open(); + Must(theSegment.mem()); + theObject = reinterpret_cast(theSegment.mem()); + Must(static_cast(theObject->sharedMemorySize()) == theSegment.size()); +} + +template +Pointer +Object::Old(const char *const id) +{ + return Pointer(new Object(id)); +} + +// convenience macros for creating shared objects +#define shm_new(Class) Ipc::Mem::Owner::New +#define shm_old(Class) Ipc::Mem::Object::Old + +} // namespace Mem + +} // namespace Ipc + +#endif /* SQUID_IPC_MEM_POINTER_H */ === added file 'src/ipc/mem/Segment.cc' --- src/ipc/mem/Segment.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/Segment.cc 2011-04-25 15:14:10 +0000 @@ -0,0 +1,174 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "ipc/mem/Segment.h" +#include "protos.h" + +#include +#include +#include +#include +#include + +Ipc::Mem::Segment::Segment(const char *const id): + theName(GenerateName(id)), theFD(-1), theMem(NULL), + theSize(0), theReserved(0), doUnlink(false) +{ +} + +Ipc::Mem::Segment::~Segment() { + if (theFD >= 0) { + detach(); + if (close(theFD) != 0) + debugs(54, 5, HERE << "close " << theName << ": " << xstrerror()); + } + if (doUnlink) + unlink(); +} + +void +Ipc::Mem::Segment::create(const off_t aSize) +{ + assert(aSize > 0); + assert(theFD < 0); + + theFD = shm_open(theName.termedBuf(), O_CREAT | O_RDWR | O_TRUNC, + S_IRUSR | S_IWUSR); + if (theFD < 0) { + debugs(54, 5, HERE << "shm_open " << theName << ": " << xstrerror()); + fatal("Ipc::Mem::Segment::create failed to shm_open"); + } + + if (ftruncate(theFD, aSize)) { + debugs(54, 5, HERE << "ftruncate " << theName << ": " << xstrerror()); + fatal("Ipc::Mem::Segment::create failed to ftruncate"); + } + + assert(statSize("Ipc::Mem::Segment::create") == aSize); // paranoid + + theSize = aSize; + theReserved = 0; + doUnlink = true; + + debugs(54, 3, HERE << "created " << theName << " segment: " << theSize); + + attach(); +} + +void +Ipc::Mem::Segment::open() +{ + assert(theFD < 0); + + theFD = shm_open(theName.termedBuf(), O_RDWR, 0); + if (theFD < 0) { + debugs(54, 5, HERE << "shm_open " << theName << ": " << xstrerror()); + String s = "Ipc::Mem::Segment::open failed to shm_open "; + s.append(theName); + fatal(s.termedBuf()); + } + + theSize = statSize("Ipc::Mem::Segment::open"); + + debugs(54, 3, HERE << "opened " << theName << " segment: " << theSize); + + attach(); +} + +/// Map the shared memory segment to the process memory space. +void +Ipc::Mem::Segment::attach() +{ + assert(theFD >= 0); + assert(!theMem); + + // mmap() accepts size_t for the size; we give it off_t which might + // be bigger; assert overflows until we support multiple mmap()s? + assert(theSize == static_cast(static_cast(theSize))); + + void *const p = + mmap(NULL, theSize, PROT_READ | PROT_WRITE, MAP_SHARED, theFD, 0); + if (p == MAP_FAILED) { + debugs(54, 5, HERE << "mmap " << theName << ": " << xstrerror()); + fatal("Ipc::Mem::Segment::attach failed to mmap"); + } + theMem = p; +} + +/// Unmap the shared memory segment from the process memory space. +void +Ipc::Mem::Segment::detach() +{ + if (!theMem) + return; + + if (munmap(theMem, theSize)) { + debugs(54, 5, HERE << "munmap " << theName << ": " << xstrerror()); + fatal("Ipc::Mem::Segment::detach failed to munmap"); + } + theMem = 0; +} + +void +Ipc::Mem::Segment::unlink() +{ + if (shm_unlink(theName.termedBuf()) != 0) + debugs(54, 5, HERE << "shm_unlink(" << theName << "): " << xstrerror()); + else + debugs(54, 3, HERE << "unlinked " << theName << " segment"); +} + +void * +Ipc::Mem::Segment::reserve(size_t chunkSize) +{ + Must(theMem); + // check for overflows + assert(static_cast(chunkSize) >= 0); + assert(static_cast(chunkSize) <= theSize); + assert(theReserved <= theSize - static_cast(chunkSize)); + void *result = reinterpret_cast(theMem) + theReserved; + theReserved += chunkSize; + return result; +} + +/// determines the size of the underlying "file" +off_t +Ipc::Mem::Segment::statSize(const char *context) const +{ + Must(theFD >= 0); + + struct stat s; + memset(&s, 0, sizeof(s)); + + if (fstat(theFD, &s) != 0) { + debugs(54, 5, HERE << "fstat " << theName << ": " << xstrerror()); + String s = context; + s.append("failed to fstat(2) "); + s.append(theName); + fatal(s.termedBuf()); + } + + return s.st_size; +} + +/// Generate name for shared memory segment. Replaces all slashes with dots. +String +Ipc::Mem::Segment::GenerateName(const char *id) +{ + String name("/squid-"); + for (const char *slash = strchr(id, '/'); slash; slash = strchr(id, '/')) { + if (id != slash) { + name.append(id, slash - id); + name.append('.'); + } + id = slash + 1; + } + name.append(id); + return name; +} === added file 'src/ipc/mem/Segment.h' --- src/ipc/mem/Segment.h 1970-01-01 00:00:00 +0000 +++ src/ipc/mem/Segment.h 2011-04-25 15:14:10 +0000 @@ -0,0 +1,57 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_MEM_SEGMENT_H +#define SQUID_IPC_MEM_SEGMENT_H + +#include "SquidString.h" + +namespace Ipc { + +namespace Mem { + +/// POSIX shared memory segment +class Segment { +public: + /// Create a shared memory segment. + Segment(const char *const id); + ~Segment(); + + /// Create a new shared memory segment. Fails if a segment with + /// the same name already exists. Unlinks the segment on destruction. + void create(const off_t aSize); + void open(); ///< Open an existing shared memory segment. + + const String &name() { return theName; } ///< shared memory segment name + off_t size() { return theSize; } ///< shared memory segment size + void *mem() { return reserve(0); } ///< pointer to the next chunk + void *reserve(size_t chunkSize); ///< reserve and return the next chunk + + +private: + void attach(); + void detach(); + void unlink(); ///< unlink the segment + off_t statSize(const char *context) const; + + static String GenerateName(const char *id); + + // not implemented + Segment(const Segment &); + Segment &operator =(const Segment &); + + const String theName; ///< shared memory segment file name + int theFD; ///< shared memory segment file descriptor + void *theMem; ///< pointer to mmapped shared memory segment + off_t theSize; ///< shared memory segment size + off_t theReserved; ///< the total number of reserve()d bytes + bool doUnlink; ///< whether the segment should be unlinked on destruction +}; + +} // namespace Mem + +} // namespace Ipc + +#endif /* SQUID_IPC_MEM_SEGMENT_H */