diff --git src/DiskIO/IpcIo/IpcIoFile.cc src/DiskIO/IpcIo/IpcIoFile.cc index aec7b40..08269c8 100644 --- src/DiskIO/IpcIo/IpcIoFile.cc +++ src/DiskIO/IpcIo/IpcIoFile.cc @@ -6,40 +6,45 @@ #include "config.h" #include "base/RunnersRegistry.h" #include "base/TextException.h" #include "DiskIO/IORequestor.h" #include "DiskIO/IpcIo/IpcIoFile.h" #include "DiskIO/ReadRequest.h" #include "DiskIO/WriteRequest.h" #include "ipc/Messages.h" #include "ipc/Port.h" #include "ipc/Queue.h" #include "ipc/StrandSearch.h" #include "ipc/UdsOp.h" #include "ipc/mem/Pages.h" #include "SquidTime.h" CBDATA_CLASS_INIT(IpcIoFile); /// shared memory segment path to use for IpcIoFile maps static const char *const ShmLabel = "io_file"; +/// a single worker-to-disker or disker-to-worker queue capacity; up +/// to 2*QueueCapacity I/O requests queued between a single worker and +/// a single disker +// TODO: make configurable or compute from squid.conf settings if possible +static const int QueueCapacity = 1024; const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles; std::auto_ptr IpcIoFile::queue; bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false; static bool DiskerOpen(const String &path, int flags, mode_t mode); static void DiskerClose(const String &path); /// IpcIo wrapper for debugs() streams; XXX: find a better class name struct SipcIo { SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker): worker(aWorker), msg(aMsg), disker(aDisker) {} int worker; const IpcIoMsg &msg; int disker; }; @@ -813,55 +818,78 @@ DiskerOpen(const String &path, int flags, mode_t mode) return false; } store_open_disk_fd++; debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile); return true; } static void DiskerClose(const String &path) { if (TheFile >= 0) { file_close(TheFile); debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); TheFile = -1; store_open_disk_fd--; } } +/// reports our needs for shared memory pages to Ipc::Mem::Pages +class IpcIoClaimMemoryNeedsRr: public RegisteredRunner +{ +public: + /* RegisteredRunner API */ + virtual void run(const RunnerRegistry &r); +}; + +RunnerRegistrationEntry(rrClaimMemoryNeeds, IpcIoClaimMemoryNeedsRr); + + +void +IpcIoClaimMemoryNeedsRr::run(const RunnerRegistry &) +{ + const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount( + ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity); + // the maximum number of shared I/O pages is approximately the + // number of queue slots, we add a fudge factor to that to account + // for corner cases where I/O pages are created before queue + // limits are checked or destroyed long after the I/O is dequeued + Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage, itemsCount * 1.1); +} + + /// initializes shared memory segments used by IpcIoFile class IpcIoRr: public Ipc::Mem::RegisteredRunner { public: /* RegisteredRunner API */ IpcIoRr(): owner(NULL) {} virtual ~IpcIoRr(); protected: virtual void create(const RunnerRegistry &); private: Ipc::FewToFewBiQueue::Owner *owner; }; RunnerRegistrationEntry(rrAfterConfig, IpcIoRr); void IpcIoRr::create(const RunnerRegistry &) { if (!UsingSmp()) return; Must(!owner); - // XXX: make capacity configurable owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, Config.cacheSwap.n_strands, 1 + Config.workers, sizeof(IpcIoMsg), - 1024); + QueueCapacity); } IpcIoRr::~IpcIoRr() { delete owner; } diff --git src/MemStore.cc src/MemStore.cc index aece423..8048b56 100644 --- src/MemStore.cc +++ src/MemStore.cc @@ -325,40 +325,58 @@ MemStore::copyToShm(StoreEntry &e, MemStoreMap::Extras &extras) void MemStore::cleanReadable(const sfileno fileno) { Ipc::Mem::PutPage(map->extras(fileno).page); theCurrentSize -= Ipc::Mem::PageSize(); } /// calculates maximum number of entries we need to store and map int64_t MemStore::EntryLimit() { if (!Config.memMaxSize) return 0; // no memory cache configured const int64_t entrySize = Ipc::Mem::PageSize(); // for now const int64_t entryLimit = Config.memMaxSize / entrySize; return entryLimit; } +/// reports our needs for shared memory pages to Ipc::Mem::Pages +class MemStoreClaimMemoryNeedsRr: public RegisteredRunner +{ +public: + /* RegisteredRunner API */ + virtual void run(const RunnerRegistry &r); +}; + +RunnerRegistrationEntry(rrClaimMemoryNeeds, MemStoreClaimMemoryNeedsRr); + + +void +MemStoreClaimMemoryNeedsRr::run(const RunnerRegistry &) +{ + Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::cachePage, MemStore::EntryLimit()); +} + + /// initializes shared memory segments used by MemStore class MemStoreRr: public Ipc::Mem::RegisteredRunner { public: /* RegisteredRunner API */ MemStoreRr(): owner(NULL) {} virtual void run(const RunnerRegistry &); virtual ~MemStoreRr(); protected: virtual void create(const RunnerRegistry &); private: MemStoreMap::Owner *owner; }; RunnerRegistrationEntry(rrAfterConfig, MemStoreRr); void MemStoreRr::run(const RunnerRegistry &r) @@ -371,29 +389,35 @@ void MemStoreRr::run(const RunnerRegistry &r) } else if (Config.memShared && !AtomicOperationsSupported) { // bail if the user wants shared memory cache but we cannot support it fatal("memory_cache_shared is on, but no support for atomic operations detected"); } else if (Config.memShared && !Ipc::Mem::Segment::Enabled()) { fatal("memory_cache_shared is on, but no support for shared memory detected"); } else if (Config.memShared && !UsingSmp()) { debugs(20, DBG_IMPORTANT, "WARNING: memory_cache_shared is on, but only" " a single worker is running"); } Ipc::Mem::RegisteredRunner::run(r); } void MemStoreRr::create(const RunnerRegistry &) { if (!Config.memShared) return; Must(!owner); const int64_t entryLimit = MemStore::EntryLimit(); - if (entryLimit <= 0) + if (entryLimit <= 0) { + if (Config.memMaxSize > 0) { + debugs(20, DBG_IMPORTANT, "WARNING: mem-cache size is too small (" + << (Config.memMaxSize / 1024.0) << " KB), should be >= " << + (Ipc::Mem::PageSize() / 1024.0) << " KB"); + } return; // no memory cache configured or a misconfiguration + } owner = MemStoreMap::Init(ShmLabel, entryLimit); } MemStoreRr::~MemStoreRr() { delete owner; } diff --git src/base/RunnersRegistry.h src/base/RunnersRegistry.h index 06e68e6..2d5b840 100644 --- src/base/RunnersRegistry.h +++ src/base/RunnersRegistry.h @@ -4,40 +4,45 @@ /** * This API allows virtually any module to register with a well-known registry, * be activated by some central processor at some registry-specific time, and * be deactiveated by some central processor at some registry-specific time. * * For example, main.cc may activate registered I/O modules after parsing * squid.conf and deactivate them before exiting. * * A module in this context is code providing a functionality or service to the * rest of Squid, such as src/DiskIO/Blocking, src/fs/ufs, or Cache Manager. A * module must declare a RegisteredRunner child class to implement activation and * deactivation logic using the run() method and destructor, respectively. * * This API allows the registry to determine the right [de]activation time for * each group of similar modules, without knowing any module specifics. * */ /// well-known registries typedef enum { + /// managed by main.cc; activated after parsing squid.conf but + /// before rrAfterConfig, deactivated after rrAfterConfig but + /// before freeing configuration-related memory or exit()-ing + rrClaimMemoryNeeds, + /// managed by main.cc; activated after parsing squid.conf and /// deactivated before freeing configuration-related memory or exit()-ing rrAfterConfig, rrEnd ///< not a real registry, just a label to mark the end of enum } RunnerRegistry; /// a runnable registrant API class RegisteredRunner { public: // called when this runner's registry is deactivated virtual ~RegisteredRunner() {} // called when this runner's registry is activated virtual void run(const RunnerRegistry &r) = 0; }; /// registers a given runner with the given registry and returns registry count diff --git src/ipc/Queue.cc src/ipc/Queue.cc index 24e6706..8afca32 100644 --- src/ipc/Queue.cc +++ src/ipc/Queue.cc @@ -132,40 +132,46 @@ Ipc::OneToOneUniQueues::operator [](const int index) const Ipc::FewToFewBiQueue::Owner * Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) { return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); } Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId), theLastPopProcessId(readers->theCapacity) { Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id); } +int +Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity) +{ + return capacity * groupASize * groupBSize * 2; +} + bool Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const { switch (group) { case groupA: return metadata->theGroupAIdOffset <= processId && processId < metadata->theGroupAIdOffset + metadata->theGroupASize; case groupB: return metadata->theGroupBIdOffset <= processId && processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; } return false; } int Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const { Must(fromGroup != toGroup); assert(validProcessId(fromGroup, fromProcessId)); assert(validProcessId(toGroup, toProcessId)); diff --git src/ipc/Queue.h src/ipc/Queue.h index 2098610..a5f1618 100644 --- src/ipc/Queue.h +++ src/ipc/Queue.h @@ -169,40 +169,43 @@ private: }; public: class Owner { public: Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); ~Owner(); private: Mem::Owner *const metadataOwner; Mem::Owner *const queuesOwner; Mem::Owner *const readersOwner; }; static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); enum Group { groupA = 0, groupB = 1 }; FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); + /// maximum number of items in the queue + static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity); + Group localGroup() const { return theLocalGroup; } Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } /// clears the reader notification received by the local process from the remote process void clearReaderSignal(const int remoteProcessId); /// picks a process and calls OneToOneUniQueue::pop() using its queue template bool pop(int &remoteProcessId, Value &value); /// calls OneToOneUniQueue::push() using the given process queue template bool push(const int remoteProcessId, const Value &value); // TODO: rename to findOldest() or some such /// calls OneToOneUniQueue::peek() using the given process queue template bool peek(const int remoteProcessId, Value &value) const; /// returns true if pop() would have probably succeeded but does not pop() bool popReady() const; /// returns local reader's balance diff --git src/ipc/mem/Pages.cc src/ipc/mem/Pages.cc index 054938b..c97487d 100644 --- src/ipc/mem/Pages.cc +++ src/ipc/mem/Pages.cc @@ -2,40 +2,41 @@ * $Id$ * * DEBUG: section 54 Interprocess Communication * */ #include "config.h" #include "base/TextException.h" #include "base/RunnersRegistry.h" #include "ipc/mem/PagePool.h" #include "ipc/mem/Pages.h" #include "structs.h" #include "SwapDir.h" // Uses a single PagePool instance, for now. // Eventually, we may have pools dedicated to memory caching, disk I/O, etc. // TODO: make pool id more unique so it does not conflict with other Squids? static const char *PagePoolId = "squid-page-pool"; static Ipc::Mem::PagePool *ThePagePool = 0; +static int TheLimits[Ipc::Mem::PageId::maxPurpose]; // TODO: make configurable to avoid waste when mem-cached objects are small/big size_t Ipc::Mem::PageSize() { return 32*1024; } bool Ipc::Mem::GetPage(const PageId::Purpose purpose, PageId &page) { return ThePagePool && PagesAvailable(purpose) > 0 ? ThePagePool->get(purpose, page) : false; } void Ipc::Mem::PutPage(PageId &page) { Must(ThePagePool); ThePagePool->put(page); @@ -43,101 +44,88 @@ Ipc::Mem::PutPage(PageId &page) char * Ipc::Mem::PagePointer(const PageId &page) { Must(ThePagePool); return ThePagePool->pagePointer(page); } size_t Ipc::Mem::PageLimit() { size_t limit = 0; for (int i = 0; i < PageId::maxPurpose; ++i) limit += PageLimit(i); return limit; } size_t Ipc::Mem::PageLimit(const int purpose) { - switch (purpose) { - case PageId::cachePage: - return Config.memMaxSize > 0 ? Config.memMaxSize / PageSize() : 0; - case PageId::ioPage: - // XXX: this should be independent from memory cache pages - return PageLimit(PageId::cachePage)/2; - default: - Must(false); - } - return 0; + Must(0 <= purpose && purpose <= PageId::maxPurpose); + return TheLimits[purpose]; +} + +// note: adjust this if we start recording needs during reconfigure +void +Ipc::Mem::NotePageNeed(const int purpose, const int count) +{ + Must(0 <= purpose && purpose <= PageId::maxPurpose); + Must(count >= 0); + TheLimits[purpose] += count; } size_t Ipc::Mem::PageLevel() { return ThePagePool ? ThePagePool->level() : 0; } size_t Ipc::Mem::PageLevel(const int purpose) { return ThePagePool ? ThePagePool->level(purpose) : 0; } /// initializes shared memory pages class SharedMemPagesRr: public Ipc::Mem::RegisteredRunner { public: /* RegisteredRunner API */ SharedMemPagesRr(): owner(NULL) {} virtual void run(const RunnerRegistry &); virtual void create(const RunnerRegistry &); virtual void open(const RunnerRegistry &); virtual ~SharedMemPagesRr(); private: Ipc::Mem::PagePool::Owner *owner; }; RunnerRegistrationEntry(rrAfterConfig, SharedMemPagesRr); void SharedMemPagesRr::run(const RunnerRegistry &r) { - if (!UsingSmp()) - return; - - // When cache_dirs start using shared memory pages, they would - // need to communicate their needs to us somehow. - if (Config.memMaxSize <= 0) - return; - - if (Ipc::Mem::PageLimit() <= 0) { - if (IamMasterProcess()) { - debugs(54, DBG_IMPORTANT, "WARNING: mem-cache size is too small (" - << (Config.memMaxSize / 1024.0) << " KB), should be >= " << - (Ipc::Mem::PageSize() / 1024.0) << " KB"); - } + if (Ipc::Mem::PageLimit() <= 0) return; - } Ipc::Mem::RegisteredRunner::run(r); } void SharedMemPagesRr::create(const RunnerRegistry &) { Must(!owner); owner = Ipc::Mem::PagePool::Init(PagePoolId, Ipc::Mem::PageLimit(), Ipc::Mem::PageSize()); } void SharedMemPagesRr::open(const RunnerRegistry &) { Must(!ThePagePool); ThePagePool = new Ipc::Mem::PagePool(PagePoolId); } SharedMemPagesRr::~SharedMemPagesRr() diff --git src/ipc/mem/Pages.h src/ipc/mem/Pages.h index aeea189..bb626d1 100644 --- src/ipc/mem/Pages.h +++ src/ipc/mem/Pages.h @@ -34,25 +34,28 @@ size_t PageLimit(); /// the total number of shared memory pages that can be in use at any /// time for given purpose size_t PageLimit(const int purpose); /// approximate total number of shared memory pages used now size_t PageLevel(); /// approximate total number of shared memory pages used now for given purpose size_t PageLevel(const int purpose); /// approximate total number of shared memory pages we can allocate now inline size_t PagesAvailable() { return PageLimit() - PageLevel(); } /// approximate total number of shared memory pages we can allocate /// now for given purpose inline size_t PagesAvailable(const int purpose) { return PageLimit(purpose) - PageLevel(purpose); } /// returns page size in bytes; all pages are assumed to be the same size size_t PageSize(); +/// claim the need for a number of pages for a given purpose +void NotePageNeed(const int purpose, const int count); + } // namespace Mem } // namespace Ipc #endif // SQUID_IPC_MEM_PAGES_H diff --git src/main.cc src/main.cc index 546eb92..21e45a2 100644 --- src/main.cc +++ src/main.cc @@ -1425,40 +1425,41 @@ SquidMain(int argc, char **argv) /* send signal to running copy and exit */ if (opt_send_signal != -1) { /* chroot if configured to run inside chroot */ if (Config.chroot_dir) { if (chroot(Config.chroot_dir)) fatal("failed to chroot"); no_suid(); } else { leave_suid(); } sendSignal(); /* NOTREACHED */ } debugs(1,2, HERE << "Doing post-config initialization\n"); leave_suid(); + ActivateRegistered(rrClaimMemoryNeeds); ActivateRegistered(rrAfterConfig); enter_suid(); if (!opt_no_daemon && Config.workers > 0) watch_child(argv); if (opt_create_swap_dirs) { /* chroot if configured to run inside chroot */ if (Config.chroot_dir && chroot(Config.chroot_dir)) { fatal("failed to chroot"); } setEffectiveUser(); debugs(0, 0, "Creating Swap Directories"); Store::Root().create(); return 0; } @@ -1794,40 +1795,41 @@ watch_child(char *argv[]) kid->name().termedBuf(), kid->getPid()); } if (kid->hopeless()) { syslog(LOG_NOTICE, "Squid Parent: %s process %d will not" " be restarted due to repeated, frequent failures", kid->name().termedBuf(), kid->getPid()); } } else { syslog(LOG_NOTICE, "Squid Parent: unknown child process %d exited", pid); } #if _SQUID_NEXT_ } while ((pid = wait3(&status, WNOHANG, NULL)) > 0); #else } while ((pid = waitpid(-1, &status, WNOHANG)) > 0); #endif if (!TheKids.someRunning() && !TheKids.shouldRestartSome()) { leave_suid(); DeactivateRegistered(rrAfterConfig); + DeactivateRegistered(rrClaimMemoryNeeds); enter_suid(); if (TheKids.someSignaled(SIGINT) || TheKids.someSignaled(SIGTERM)) { syslog(LOG_ALERT, "Exiting due to unexpected forced shutdown"); exit(1); } if (TheKids.allHopeless()) { syslog(LOG_ALERT, "Exiting due to repeated, frequent failures"); exit(1); } exit(0); } squid_signal(SIGINT, SIG_DFL, SA_RESTART); sleep(3); } /* NOTREACHED */ @@ -1896,40 +1898,41 @@ SquidShutdown() WIN32_svcstatusupdate(SERVICE_STOP_PENDING, 10000); #endif Store::Root().sync(); /* Flush pending object writes/unlinks */ #if USE_UNLINKD unlinkdClose(); /* after sync/flush */ #endif storeDirWriteCleanLogs(0); PrintRusage(); dumpMallocStats(); Store::Root().sync(); /* Flush log writes */ storeLogClose(); accessLogClose(); Store::Root().sync(); /* Flush log close */ StoreFileSystem::FreeAllFs(); DiskIOModule::FreeAllModules(); DeactivateRegistered(rrAfterConfig); + DeactivateRegistered(rrClaimMemoryNeeds); #if LEAK_CHECK_MODE && 0 /* doesn't work at the moment */ configFreeMemory(); storeFreeMemory(); /*stmemFreeMemory(); */ netdbFreeMemory(); ipcacheFreeMemory(); fqdncacheFreeMemory(); asnFreeMemory(); clientdbFreeMemory(); httpHeaderCleanModule(); statFreeMemory(); eventFreeMemory(); mimeFreeMemory(); errorClean(); #endif #if !XMALLOC_TRACE if (opt_no_daemon) { file_close(0);