SMP Caching, part 2: Rock Store addition Rock Store uses a single [large] database-style file per cache_dir to store cached responses and metadata. This part of the design is similar to COSS. Rock Store does not maintain or rely on swap.state "log" for recovery. Instead, the database is scanned in the background to load entries when Squid starts. Rock Store maintains its own index of cached entries and avoids global store_table. All entries must be max-size or smaller. In SMP mode, each Rock cache_dir is given a dedicated Kid processes called "disker". All SMP workers communicate with diskers to store misses and load hits, using shared memory pages and atomic shared memory queues. Disker blocks when doing disk I/O but workers do not. Any Diskers:Workers ratio is supported so that the user can find and configure the optimal number of workers and diskers for a given number of disks and CPU cores. In non-SMP mode, uses good old blocking disk I/O, without any diskers. TODO: Portability. Better coordination with shared memory cache (preallocate and then avoid copying of shared memory pages when doing disk I/O). More stats. Seek optimization. Remove known max-size requirement? === modified file 'configure.ac' --- configure.ac 2011-04-15 11:51:15 +0000 +++ configure.ac 2011-04-19 22:36:00 +0000 @@ -626,6 +626,20 @@ fi ;; + Mmapped) + AC_MSG_NOTICE([Enabling Mmapped DiskIO module]) + DISK_LIBS="$DISK_LIBS libMmapped.a" + DISK_MODULES="$DISK_MODULES Mmapped" + DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/Mmapped/MmappedDiskIOModule.o" + ;; + + IpcIo) + AC_MSG_NOTICE([Enabling IpcIo DiskIO module]) + DISK_LIBS="$DISK_LIBS libIpcIo.a" + DISK_MODULES="$DISK_MODULES IpcIo" + DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/IpcIo/IpcIoDiskIOModule.o" + ;; + Blocking) AC_MSG_NOTICE([Enabling Blocking DiskIO module]) DISK_LIBS="$DISK_LIBS libBlocking.a" @@ -713,6 +727,13 @@ # for STORE_TESTS substition STORE_TESTS="$STORE_TESTS tests/testCoss$EXEEXT" ;; + rock) + if ! test "x$squid_disk_module_candidates_IpcIo" = "xyes" -a \ + "x$squid_disk_module_candidates_Blocking" = "xyes"; then + AC_MSG_ERROR([Storage module Rock requires IpcIo and Blocking DiskIO modules]) + fi + STORE_TESTS="$STORE_TESTS tests/testRock$EXEEXT" + ;; ufs) STORE_TESTS="$STORE_TESTS tests/testUfs$EXEEXT" esac @@ -724,6 +745,7 @@ AH_TEMPLATE(HAVE_FS_AUFS, "Define to 1 if aufs filesystem module is build") AH_TEMPLATE(HAVE_FS_DISKD, "Define to 1 if diskd filesystem module is build") AH_TEMPLATE(HAVE_FS_COSS, "Define to 1 if coss filesystem module is build") +AH_TEMPLATE(HAVE_FS_ROCK, "Define to 1 if rock filesystem module is build") dnl got final squid_storeio_module_candidates, build library lists === added directory 'src/DiskIO/IpcIo' === added file 'src/DiskIO/IpcIo/DiskIOIpcIo.cc' === added file 'src/DiskIO/IpcIo/IpcIoDiskIOModule.cc' --- src/DiskIO/IpcIo/IpcIoDiskIOModule.cc 1970-01-01 00:00:00 +0000 +++ src/DiskIO/IpcIo/IpcIoDiskIOModule.cc 2011-02-01 05:01:43 +0000 @@ -0,0 +1,37 @@ +#include "squid.h" +#include "IpcIoDiskIOModule.h" +#include "IpcIoIOStrategy.h" + +IpcIoDiskIOModule::IpcIoDiskIOModule() +{ + ModuleAdd(*this); +} + +IpcIoDiskIOModule & +IpcIoDiskIOModule::GetInstance() +{ + return Instance; +} + +void +IpcIoDiskIOModule::init() +{} + +void +IpcIoDiskIOModule::shutdown() +{} + + +DiskIOStrategy* +IpcIoDiskIOModule::createStrategy() +{ + return new IpcIoIOStrategy(); +} + +IpcIoDiskIOModule IpcIoDiskIOModule::Instance; + +char const * +IpcIoDiskIOModule::type () const +{ + return "IpcIo"; +} === added file 'src/DiskIO/IpcIo/IpcIoDiskIOModule.h' --- src/DiskIO/IpcIo/IpcIoDiskIOModule.h 1970-01-01 00:00:00 +0000 +++ src/DiskIO/IpcIo/IpcIoDiskIOModule.h 2011-02-01 05:01:43 +0000 @@ -0,0 +1,21 @@ +#ifndef SQUID_IPC_IODISKIOMODULE_H +#define SQUID_IPC_IODISKIOMODULE_H + +#include "DiskIO/DiskIOModule.h" + +class IpcIoDiskIOModule : public DiskIOModule +{ + +public: + static IpcIoDiskIOModule &GetInstance(); + IpcIoDiskIOModule(); + virtual void init(); + virtual void shutdown(); + virtual char const *type () const; + virtual DiskIOStrategy* createStrategy(); + +private: + static IpcIoDiskIOModule Instance; +}; + +#endif /* SQUID_IPC_IODISKIOMODULE_H */ === added file 'src/DiskIO/IpcIo/IpcIoFile.cc' --- src/DiskIO/IpcIo/IpcIoFile.cc 1970-01-01 00:00:00 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.cc 2011-04-26 20:39:59 +0000 @@ -0,0 +1,719 @@ +/* + * $Id$ + * + * DEBUG: section 47 Store Directory Routines + */ + +#include "config.h" +#include "base/RunnersRegistry.h" +#include "base/TextException.h" +#include "DiskIO/IORequestor.h" +#include "DiskIO/IpcIo/IpcIoFile.h" +#include "DiskIO/ReadRequest.h" +#include "DiskIO/WriteRequest.h" +#include "ipc/Messages.h" +#include "ipc/Port.h" +#include "ipc/Queue.h" +#include "ipc/StrandSearch.h" +#include "ipc/UdsOp.h" +#include "ipc/mem/Pages.h" + +CBDATA_CLASS_INIT(IpcIoFile); + +/// shared memory segment path to use for IpcIoFile maps +static const char *const ShmLabel = "io_file"; + +const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more +IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; +IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles; +std::auto_ptr IpcIoFile::queue; + +static bool DiskerOpen(const String &path, int flags, mode_t mode); +static void DiskerClose(const String &path); + +/// IpcIo wrapper for debugs() streams; XXX: find a better class name +struct SipcIo { + SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker): + worker(aWorker), msg(aMsg), disker(aDisker) {} + + int worker; + const IpcIoMsg &msg; + int disker; +}; + +std::ostream & +operator <<(std::ostream &os, const SipcIo &sio) +{ + return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId << + (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker; +} + + +IpcIoFile::IpcIoFile(char const *aDb): + dbName(aDb), diskId(-1), error_(false), lastRequestId(0), + olderRequests(&requestMap1), newerRequests(&requestMap2), + timeoutCheckScheduled(false) +{ +} + +IpcIoFile::~IpcIoFile() +{ + if (diskId >= 0) { + const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId); + // XXX: warn and continue? + Must(i != IpcIoFiles.end()); + Must(i->second == this); + IpcIoFiles.erase(i); + } +} + +void +IpcIoFile::open(int flags, mode_t mode, RefCount callback) +{ + ioRequestor = callback; + Must(diskId < 0); // we do not know our disker yet + + if (!queue.get()) + queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier)); + + if (IamDiskProcess()) { + error_ = !DiskerOpen(dbName, flags, mode); + if (error_) + return; + + diskId = KidIdentifier; + const bool inserted = + IpcIoFiles.insert(std::make_pair(diskId, this)).second; + Must(inserted); + + Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); + ann.strand.tag = dbName; + Ipc::TypedMsgHdr message; + ann.pack(message); + SendMessage(Ipc::coordinatorAddr, message); + + ioRequestor->ioCompletedNotification(); + return; + } + + Ipc::StrandSearchRequest request; + request.requestorId = KidIdentifier; + request.tag = dbName; + + Ipc::TypedMsgHdr msg; + request.pack(msg); + Ipc::SendMessage(Ipc::coordinatorAddr, msg); + + WaitingForOpen.push_back(this); + + eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout, + this, Timeout, 0, false); // "this" pointer is used as id +} + +void +IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { + Must(diskId < 0); // we do not know our disker yet + + if (!response) { + debugs(79,1, HERE << "error: timeout"); + error_ = true; + } else { + diskId = response->strand.kidId; + if (diskId >= 0) { + const bool inserted = + IpcIoFiles.insert(std::make_pair(diskId, this)).second; + Must(inserted); + } else { + error_ = true; + debugs(79,1, HERE << "error: no disker claimed " << dbName); + } + } + + ioRequestor->ioCompletedNotification(); +} + +/** + * Alias for IpcIoFile::open(...) + \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount callback) + */ +void +IpcIoFile::create(int flags, mode_t mode, RefCount callback) +{ + assert(false); // check + /* We use the same logic path for open */ + open(flags, mode, callback); +} + +void +IpcIoFile::close() +{ + assert(ioRequestor != NULL); + + if (IamDiskProcess()) + DiskerClose(dbName); + // XXX: else nothing to do? + + ioRequestor->closeCompleted(); +} + +bool +IpcIoFile::canRead() const +{ + return diskId >= 0; +} + +bool +IpcIoFile::canWrite() const +{ + return diskId >= 0; +} + +bool +IpcIoFile::error() const +{ + return error_; +} + +void +IpcIoFile::read(ReadRequest *readRequest) +{ + debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " << + readRequest->offset << ")"); + + assert(ioRequestor != NULL); + assert(readRequest->len >= 0); + assert(readRequest->offset >= 0); + Must(!error_); + + //assert(minOffset < 0 || minOffset <= readRequest->offset); + //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset); + + IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); + pending->readRequest = readRequest; + push(pending); +} + +void +IpcIoFile::readCompleted(ReadRequest *readRequest, + IpcIoMsg *const response) +{ + bool ioError = false; + if (!response) { + debugs(79,1, HERE << "error: timeout"); + ioError = true; // I/O timeout does not warrant setting error_? + } else + if (response->xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); + ioError = error_ = true; + } + else + if (!response->page) { + debugs(79,1, HERE << "error: run out of shared memory pages"); + ioError = true; + } else { + const char *const buf = Ipc::Mem::PagePointer(response->page); + memcpy(readRequest->buf, buf, response->len); + } + + Ipc::Mem::PutPage(response->page); + + const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len; + const int errflag = ioError ? DISK_ERROR : DISK_OK; + ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest); +} + +void +IpcIoFile::write(WriteRequest *writeRequest) +{ + debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " << + writeRequest->offset << ")"); + + assert(ioRequestor != NULL); + assert(writeRequest->len >= 0); + assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len? + assert(writeRequest->offset >= 0); + Must(!error_); + + //assert(minOffset < 0 || minOffset <= writeRequest->offset); + //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset); + + IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); + pending->writeRequest = writeRequest; + push(pending); +} + +void +IpcIoFile::writeCompleted(WriteRequest *writeRequest, + const IpcIoMsg *const response) +{ + bool ioError = false; + if (!response) { + debugs(79,1, HERE << "error: timeout"); + ioError = true; // I/O timeout does not warrant setting error_? + } else + if (response->xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); + ioError = error_ = true; + } else + if (response->len != writeRequest->len) { + debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len); + error_ = true; + } + + if (writeRequest->free_func) + (writeRequest->free_func)(const_cast(writeRequest->buf)); // broken API? + + if (!ioError) { + debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << + diskId << " at " << writeRequest->offset); + } + + const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len; + const int errflag = ioError ? DISK_ERROR : DISK_OK; + ioRequestor->writeCompleted(errflag, rlen, writeRequest); +} + +bool +IpcIoFile::ioInProgress() const +{ + return !olderRequests->empty() || !newerRequests->empty(); +} + +/// track a new pending request +void +IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) +{ + newerRequests->insert(std::make_pair(lastRequestId, pending)); + if (!timeoutCheckScheduled) + scheduleTimeoutCheck(); +} + +/// push an I/O request to disker +void +IpcIoFile::push(IpcIoPendingRequest *const pending) +{ + // prevent queue overflows: check for responses to earlier requests + HandleResponses("before push"); + + debugs(47, 7, HERE); + Must(diskId >= 0); + Must(pending); + Must(pending->readRequest || pending->writeRequest); + + IpcIoMsg ipcIo; + try { + ipcIo.requestId = lastRequestId; + if (pending->readRequest) { + ipcIo.command = IpcIo::cmdRead; + ipcIo.offset = pending->readRequest->offset; + ipcIo.len = pending->readRequest->len; + } else { // pending->writeRequest + Must(pending->writeRequest->len <= Ipc::Mem::PageSize()); + if (!Ipc::Mem::GetPage(ipcIo.page)) { + ipcIo.len = 0; + throw TexcHere("run out of shared memory pages"); + } + ipcIo.command = IpcIo::cmdWrite; + ipcIo.offset = pending->writeRequest->offset; + ipcIo.len = pending->writeRequest->len; + char *const buf = Ipc::Mem::PagePointer(ipcIo.page); + memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away + } + + debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId)); + + if (queue->push(diskId, ipcIo)) + Notify(diskId); // must notify disker + trackPendingRequest(pending); + } catch (const Queue::Full &) { + debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << + SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len + // TODO: grow queue size + + pending->completeIo(NULL); // XXX: should distinguish this from timeout + delete pending; + } catch (const TextException &e) { + debugs(47, DBG_IMPORTANT, HERE << e.what()); + pending->completeIo(NULL); // XXX: should distinguish this from timeout + delete pending; + } +} + +/// called when coordinator responds to worker open request +void +IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) +{ + debugs(47, 7, HERE << "coordinator response to open request"); + for (IpcIoFileList::iterator i = WaitingForOpen.begin(); + i != WaitingForOpen.end(); ++i) { + if (response.strand.tag == (*i)->dbName) { + (*i)->openCompleted(&response); + WaitingForOpen.erase(i); + return; + } + } + + debugs(47, 4, HERE << "LATE disker response to open for " << + response.strand.tag); + // nothing we can do about it; completeIo() has been called already +} + +void +IpcIoFile::HandleResponses(const char *const when) +{ + debugs(47, 4, HERE << "popping all " << when); + IpcIoMsg ipcIo; + // get all responses we can: since we are not pushing, this will stop + int diskId; + while (queue->pop(diskId, ipcIo)) { + const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); + Must(i != IpcIoFiles.end()); // TODO: warn but continue + i->second->handleResponse(ipcIo); + } +} + +void +IpcIoFile::handleResponse(IpcIoMsg &ipcIo) +{ + const int requestId = ipcIo.requestId; + debugs(47, 7, HERE << "popped disker response: " << + SipcIo(KidIdentifier, ipcIo, diskId)); + + Must(requestId); + if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { + pending->completeIo(&ipcIo); + delete pending; // XXX: leaking if throwing + } else { + debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command << + "; ipcIo" << KidIdentifier << '.' << requestId); + // nothing we can do about it; completeIo() has been called already + } +} + +void +IpcIoFile::Notify(const int peerId) +{ + // TODO: Count and report the total number of notifications, pops, pushes. + debugs(47, 7, HERE << "kid" << peerId); + Ipc::TypedMsgHdr msg; + msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type? + msg.putInt(KidIdentifier); + const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId); + Ipc::SendMessage(addr, msg); +} + +void +IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) +{ + const int from = msg.getInt(); + debugs(47, 7, HERE << "from " << from); + queue->clearReaderSignal(from); + if (IamDiskProcess()) + DiskerHandleRequests(); + else + HandleResponses("after notification"); +} + +/// handles open request timeout +void +IpcIoFile::OpenTimeout(void *const param) +{ + Must(param); + // the pointer is used for comparison only and not dereferenced + const IpcIoFile *const ipcIoFile = + reinterpret_cast(param); + for (IpcIoFileList::iterator i = WaitingForOpen.begin(); + i != WaitingForOpen.end(); ++i) { + if (*i == ipcIoFile) { + (*i)->openCompleted(NULL); + WaitingForOpen.erase(i); + break; + } + } +} + +/// IpcIoFile::checkTimeouts wrapper +void +IpcIoFile::CheckTimeouts(void *const param) +{ + Must(param); + const int diskId = reinterpret_cast(param); + debugs(47, 7, HERE << "diskId=" << diskId); + const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); + if (i != IpcIoFiles.end()) + i->second->checkTimeouts(); +} + +void +IpcIoFile::checkTimeouts() +{ + timeoutCheckScheduled = false; + + // any old request would have timed out by now + typedef RequestMap::const_iterator RMCI; + for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) { + IpcIoPendingRequest *const pending = i->second; + + const unsigned int requestId = i->first; + debugs(47, 7, HERE << "disker timeout; ipcIo" << + KidIdentifier << '.' << requestId); + + pending->completeIo(NULL); // no response + delete pending; // XXX: leaking if throwing + } + olderRequests->clear(); + + swap(olderRequests, newerRequests); // switches pointers around + if (!olderRequests->empty()) + scheduleTimeoutCheck(); +} + +/// prepare to check for timeouts in a little while +void +IpcIoFile::scheduleTimeoutCheck() +{ + // we check all older requests at once so some may be wait for 2*Timeout + eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts, + reinterpret_cast(diskId), Timeout, 0, false); + timeoutCheckScheduled = true; +} + +/// returns and forgets the right IpcIoFile pending request +IpcIoPendingRequest * +IpcIoFile::dequeueRequest(const unsigned int requestId) +{ + Must(requestId != 0); + + RequestMap *map = NULL; + RequestMap::iterator i = requestMap1.find(requestId); + + if (i != requestMap1.end()) + map = &requestMap1; + else { + i = requestMap2.find(requestId); + if (i != requestMap2.end()) + map = &requestMap2; + } + + if (!map) // not found in both maps + return NULL; + + IpcIoPendingRequest *pending = i->second; + map->erase(i); + return pending; +} + +int +IpcIoFile::getFD() const +{ + assert(false); // not supported; TODO: remove this method from API + return -1; +} + + +/* IpcIoMsg */ + +IpcIoMsg::IpcIoMsg(): + requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0) +{ +} + +/* IpcIoPendingRequest */ + +IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): + file(aFile), readRequest(NULL), writeRequest(NULL) +{ + Must(file != NULL); + if (++file->lastRequestId == 0) // don't use zero value as requestId + ++file->lastRequestId; +} + +void +IpcIoPendingRequest::completeIo(IpcIoMsg *const response) +{ + if (readRequest) + file->readCompleted(readRequest, response); + else + if (writeRequest) + file->writeCompleted(writeRequest, response); + else { + Must(!response); // only timeouts are handled here + file->openCompleted(NULL); + } +} + + + +/* XXX: disker code that should probably be moved elsewhere */ + +static int TheFile = -1; ///< db file descriptor + +static void +diskerRead(IpcIoMsg &ipcIo) +{ + if (!Ipc::Mem::GetPage(ipcIo.page)) { + ipcIo.len = 0; + debugs(47,5, HERE << "run out of shared memory pages"); + return; + } + + char *const buf = Ipc::Mem::PagePointer(ipcIo.page); + const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); + statCounter.syscalls.disk.reads++; + fd_bytes(TheFile, read, FD_READ); + + if (read >= 0) { + ipcIo.xerrno = 0; + const size_t len = static_cast(read); // safe because read > 0 + debugs(47,8, HERE << "disker" << KidIdentifier << " read " << + (len == ipcIo.len ? "all " : "just ") << read); + ipcIo.len = len; + } else { + ipcIo.xerrno = errno; + ipcIo.len = 0; + debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " << + ipcIo.xerrno); + } +} + +static void +diskerWrite(IpcIoMsg &ipcIo) +{ + const char *const buf = Ipc::Mem::PagePointer(ipcIo.page); + const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); + statCounter.syscalls.disk.writes++; + fd_bytes(TheFile, wrote, FD_WRITE); + + if (wrote >= 0) { + ipcIo.xerrno = 0; + const size_t len = static_cast(wrote); // safe because wrote > 0 + debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " << + (len == ipcIo.len ? "all " : "just ") << wrote); + ipcIo.len = len; + } else { + ipcIo.xerrno = errno; + ipcIo.len = 0; + debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << + ipcIo.xerrno); + } + + Ipc::Mem::PutPage(ipcIo.page); +} + +void +IpcIoFile::DiskerHandleRequests() +{ + int workerId = 0; + IpcIoMsg ipcIo; + while (queue->pop(workerId, ipcIo)) + DiskerHandleRequest(workerId, ipcIo); + + // TODO: If the loop keeps on looping, we probably should take a break + // once in a while to update clock, read Coordinator messages, etc. + // This can be combined with "elevator" optimization where we get up to N + // requests first, then reorder the popped requests to optimize seek time, + // then do I/O, then take a break, and come back for the next set of I/O + // requests. +} + +/// called when disker receives an I/O request +void +IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) +{ + if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { + debugs(0,0, HERE << "disker" << KidIdentifier << + " should not receive " << ipcIo.command << + " ipcIo" << workerId << '.' << ipcIo.requestId); + return; + } + + debugs(47,5, HERE << "disker" << KidIdentifier << + (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") << + ipcIo.len << " at " << ipcIo.offset << + " ipcIo" << workerId << '.' << ipcIo.requestId); + + if (ipcIo.command == IpcIo::cmdRead) + diskerRead(ipcIo); + else // ipcIo.command == IpcIo::cmdWrite + diskerWrite(ipcIo); + + debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier)); + + try { + if (queue->push(workerId, ipcIo)) + Notify(workerId); // must notify worker + } catch (const Queue::Full &) { + // The worker queue should not overflow because the worker should pop() + // before push()ing and because if disker pops N requests at a time, + // we should make sure the worker pop() queue length is the worker + // push queue length plus N+1. XXX: implement the N+1 difference. + debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " << + SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len + + // the I/O request we could not push will timeout + } +} + +static bool +DiskerOpen(const String &path, int flags, mode_t mode) +{ + assert(TheFile < 0); + + TheFile = file_open(path.termedBuf(), flags); + + if (TheFile < 0) { + const int xerrno = errno; + debugs(47,0, HERE << "rock db error opening " << path << ": " << + xstrerr(xerrno)); + return false; + } + + store_open_disk_fd++; + debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile); + return true; +} + +static void +DiskerClose(const String &path) +{ + if (TheFile >= 0) { + file_close(TheFile); + debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); + TheFile = -1; + store_open_disk_fd--; + } +} + + +/// initializes shared memory segments used by IpcIoFile +class IpcIoRr: public RegisteredRunner +{ +public: + /* RegisteredRunner API */ + IpcIoRr(): owner(NULL) {} + virtual void run(const RunnerRegistry &); + virtual ~IpcIoRr(); + +private: + Ipc::FewToFewBiQueue::Owner *owner; +}; + +RunnerRegistrationEntry(rrAfterConfig, IpcIoRr); + + +void IpcIoRr::run(const RunnerRegistry &) +{ + if (!UsingSmp()) + return; + + if (IamMasterProcess()) { + Must(!owner); + // XXX: make capacity configurable + owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, Config.cacheSwap.n_configured, 1 + Config.workers, sizeof(IpcIoMsg), 1024); + } +} + +IpcIoRr::~IpcIoRr() +{ + delete owner; +} === added file 'src/DiskIO/IpcIo/IpcIoFile.h' --- src/DiskIO/IpcIo/IpcIoFile.h 1970-01-01 00:00:00 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.h 2011-04-26 20:39:59 +0000 @@ -0,0 +1,150 @@ +#ifndef SQUID_IPC_IOFILE_H +#define SQUID_IPC_IOFILE_H + +#include "base/AsyncCall.h" +#include "cbdata.h" +#include "DiskIO/DiskFile.h" +#include "DiskIO/IORequestor.h" +#include "ipc/forward.h" +#include "ipc/mem/Page.h" +#include +#include +#include + +namespace Ipc { +class FewToFewBiQueue; +} // Ipc + +// TODO: expand to all classes +namespace IpcIo { + +/// what kind of I/O the disker needs to do or have done +typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command; + +} // namespace IpcIo + + +/// converts DiskIO requests to IPC queue messages +class IpcIoMsg { +public: + IpcIoMsg(); + +public: + unsigned int requestId; ///< unique for requestor; matches request w/ response + + off_t offset; + size_t len; + Ipc::Mem::PageId page; + + IpcIo::Command command; ///< what disker is supposed to do or did + + int xerrno; ///< I/O error code or zero +}; + +class IpcIoPendingRequest; + +class IpcIoFile: public DiskFile +{ + +public: + typedef RefCount Pointer; + + IpcIoFile(char const *aDb); + virtual ~IpcIoFile(); + + /* DiskFile API */ + virtual void open(int flags, mode_t mode, RefCount callback); + virtual void create(int flags, mode_t mode, RefCount callback); + virtual void read(ReadRequest *); + virtual void write(WriteRequest *); + virtual void close(); + virtual bool error() const; + virtual int getFD() const; + virtual bool canRead() const; + virtual bool canWrite() const; + virtual bool ioInProgress() const; + + /// handle open response from coordinator + static void HandleOpenResponse(const Ipc::StrandSearchResponse &response); + + /// handle queue push notifications from worker or disker + static void HandleNotification(const Ipc::TypedMsgHdr &msg); + +protected: + friend class IpcIoPendingRequest; + void openCompleted(const Ipc::StrandSearchResponse *const response); + void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response); + void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response); + +private: + void trackPendingRequest(IpcIoPendingRequest *const pending); + void push(IpcIoPendingRequest *const pending); + IpcIoPendingRequest *dequeueRequest(const unsigned int requestId); + + static void Notify(const int peerId); + + static void OpenTimeout(void *const param); + static void CheckTimeouts(void *const param); + void checkTimeouts(); + void scheduleTimeoutCheck(); + + static void HandleResponses(const char *const when); + void handleResponse(IpcIoMsg &ipcIo); + + static void DiskerHandleRequests(); + static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); + +private: + const String dbName; ///< the name of the file we are managing + int diskId; ///< the process ID of the disker we talk to + RefCount ioRequestor; + + bool error_; ///< whether we have seen at least one I/O error (XXX) + + unsigned int lastRequestId; ///< last requestId used + + /// maps requestId to the handleResponse callback + typedef std::map RequestMap; + RequestMap requestMap1; ///< older (or newer) pending requests + RequestMap requestMap2; ///< newer (or older) pending requests + RequestMap *olderRequests; ///< older requests (map1 or map2) + RequestMap *newerRequests; ///< newer requests (map2 or map1) + bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call + + static const double Timeout; ///< timeout value in seconds + + typedef std::list IpcIoFileList; + static IpcIoFileList WaitingForOpen; ///< pending open requests + + ///< maps diskerId to IpcIoFile, cleared in destructor + typedef std::map IpcIoFilesMap; + static IpcIoFilesMap IpcIoFiles; + + typedef Ipc::FewToFewBiQueue Queue; + static std::auto_ptr queue; ///< IPC queue + + CBDATA_CLASS2(IpcIoFile); +}; + + +/// keeps original I/O request parameters while disker is handling the request +class IpcIoPendingRequest +{ +public: + IpcIoPendingRequest(const IpcIoFile::Pointer &aFile); + + /// called when response is received and, with a nil response, on timeouts + void completeIo(IpcIoMsg *const response); + +public: + const IpcIoFile::Pointer file; ///< the file object waiting for the response + ReadRequest *readRequest; ///< set if this is a read requests + WriteRequest *writeRequest; ///< set if this is a write request + +private: + IpcIoPendingRequest(const IpcIoPendingRequest &d); // not implemented + IpcIoPendingRequest &operator =(const IpcIoPendingRequest &d); // ditto +}; + + +#endif /* SQUID_IPC_IOFILE_H */ === added file 'src/DiskIO/IpcIo/IpcIoIOStrategy.cc' --- src/DiskIO/IpcIo/IpcIoIOStrategy.cc 1970-01-01 00:00:00 +0000 +++ src/DiskIO/IpcIo/IpcIoIOStrategy.cc 2011-02-01 05:01:43 +0000 @@ -0,0 +1,37 @@ + +/* + * $Id$ + * + * DEBUG: section 47 Store Directory Routines + */ + +#include "IpcIoIOStrategy.h" +#include "IpcIoFile.h" +bool +IpcIoIOStrategy::shedLoad() +{ + return false; +} + +int +IpcIoIOStrategy::load() +{ + /* Return 999 (99.9%) constant load */ + return 999; +} + +DiskFile::Pointer +IpcIoIOStrategy::newFile (char const *path) +{ + return new IpcIoFile (path); +} + +void +IpcIoIOStrategy::unlinkFile(char const *path) +{ +#if USE_UNLINKD + unlinkdUnlink(path); +#else + ::unlink(path); +#endif +} === added file 'src/DiskIO/IpcIo/IpcIoIOStrategy.h' --- src/DiskIO/IpcIo/IpcIoIOStrategy.h 1970-01-01 00:00:00 +0000 +++ src/DiskIO/IpcIo/IpcIoIOStrategy.h 2011-02-01 05:01:43 +0000 @@ -0,0 +1,15 @@ +#ifndef SQUID_IPC_IOIOSTRATEGY_H +#define SQUID_IPC_IOIOSTRATEGY_H +#include "DiskIO/DiskIOStrategy.h" + +class IpcIoIOStrategy : public DiskIOStrategy +{ + +public: + virtual bool shedLoad(); + virtual int load(); + virtual RefCount newFile(char const *path); + virtual void unlinkFile (char const *); +}; + +#endif /* SQUID_IPC_IOIOSTRATEGY_H */ === added directory 'src/DiskIO/Mmapped' === added file 'src/DiskIO/Mmapped/DiskIOMmapped.cc' === added file 'src/DiskIO/Mmapped/MmappedDiskIOModule.cc' --- src/DiskIO/Mmapped/MmappedDiskIOModule.cc 1970-01-01 00:00:00 +0000 +++ src/DiskIO/Mmapped/MmappedDiskIOModule.cc 2011-01-27 21:14:56 +0000 @@ -0,0 +1,37 @@ +#include "squid.h" +#include "MmappedDiskIOModule.h" +#include "MmappedIOStrategy.h" + +MmappedDiskIOModule::MmappedDiskIOModule() +{ + ModuleAdd(*this); +} + +MmappedDiskIOModule & +MmappedDiskIOModule::GetInstance() +{ + return Instance; +} + +void +MmappedDiskIOModule::init() +{} + +void +MmappedDiskIOModule::shutdown() +{} + + +DiskIOStrategy* +MmappedDiskIOModule::createStrategy() +{ + return new MmappedIOStrategy(); +} + +MmappedDiskIOModule MmappedDiskIOModule::Instance; + +char const * +MmappedDiskIOModule::type () const +{ + return "Mmapped"; +} === added file 'src/DiskIO/Mmapped/MmappedDiskIOModule.h' --- src/DiskIO/Mmapped/MmappedDiskIOModule.h 1970-01-01 00:00:00 +0000 +++ src/DiskIO/Mmapped/MmappedDiskIOModule.h 2011-01-27 21:14:56 +0000 @@ -0,0 +1,21 @@ +#ifndef SQUID_MMAPPEDDISKIOMODULE_H +#define SQUID_MMAPPEDDISKIOMODULE_H + +#include "DiskIO/DiskIOModule.h" + +class MmappedDiskIOModule : public DiskIOModule +{ + +public: + static MmappedDiskIOModule &GetInstance(); + MmappedDiskIOModule(); + virtual void init(); + virtual void shutdown(); + virtual char const *type () const; + virtual DiskIOStrategy* createStrategy(); + +private: + static MmappedDiskIOModule Instance; +}; + +#endif /* SQUID_MMAPPEDDISKIOMODULE_H */ === added file 'src/DiskIO/Mmapped/MmappedFile.cc' --- src/DiskIO/Mmapped/MmappedFile.cc 1970-01-01 00:00:00 +0000 +++ src/DiskIO/Mmapped/MmappedFile.cc 2011-01-27 21:14:56 +0000 @@ -0,0 +1,271 @@ +/* + * $Id$ + * + * DEBUG: section 47 Store Directory Routines + */ + +#include "DiskIO/Mmapped/MmappedFile.h" +#include +#include "DiskIO/IORequestor.h" +#include "DiskIO/ReadRequest.h" +#include "DiskIO/WriteRequest.h" + +CBDATA_CLASS_INIT(MmappedFile); + +// helper class to deal with mmap(2) offset alignment and other low-level specs +class Mmapping { +public: + Mmapping(int fd, size_t length, int prot, int flags, off_t offset); + ~Mmapping(); + + void *map(); ///< calls mmap(2); returns usable buffer or nil on failure + bool unmap(); ///< unmaps previously mapped buffer, if any + +private: + const int fd; ///< descriptor of the mmapped file + const size_t length; ///< user-requested data length, needed for munmap + const int prot; ///< mmap(2) "protection" flags + const int flags; ///< other mmap(2) flags + const off_t offset; ///< user-requested data offset + + off_t delta; ///< mapped buffer increment to hit user offset + void *buf; ///< buffer returned by mmap, needed for munmap +}; + + +void * +MmappedFile::operator new(size_t sz) +{ + CBDATA_INIT_TYPE(MmappedFile); + MmappedFile *result = cbdataAlloc(MmappedFile); + /* Mark result as being owned - we want the refcounter to do the delete + * call */ + return result; +} + +void +MmappedFile::operator delete(void *address) +{ + MmappedFile *t = static_cast(address); + cbdataFree(t); +} + +MmappedFile::MmappedFile(char const *aPath): fd(-1), + minOffset(0), maxOffset(-1), error_(false) +{ + assert(aPath); + path_ = xstrdup(aPath); + debugs(79,5, HERE << this << ' ' << path_); +} + +MmappedFile::~MmappedFile() +{ + safe_free(path_); + doClose(); +} + +// XXX: almost a copy of BlockingFile::open +void +MmappedFile::open(int flags, mode_t mode, RefCount callback) +{ + assert(fd < 0); + + /* Simulate async calls */ + fd = file_open(path_ , flags); + ioRequestor = callback; + + if (fd < 0) { + debugs(79,3, HERE << "open error: " << xstrerror()); + error_ = true; + } else { + store_open_disk_fd++; + debugs(79,3, HERE << "FD " << fd); + + // setup mapping boundaries + struct stat sb; + if (fstat(fd, &sb) == 0) + maxOffset = sb.st_size; // we do not expect it to change + } + + callback->ioCompletedNotification(); +} + +/** + * Alias for MmappedFile::open(...) + \copydoc MmappedFile::open(int flags, mode_t mode, RefCount callback) + */ +void +MmappedFile::create(int flags, mode_t mode, RefCount callback) +{ + /* We use the same logic path for open */ + open(flags, mode, callback); +} + +void MmappedFile::doClose() +{ + if (fd >= 0) { + file_close(fd); + fd = -1; + store_open_disk_fd--; + } +} + +void +MmappedFile::close() +{ + debugs(79, 3, HERE << this << " closing for " << ioRequestor); + doClose(); + assert(ioRequestor != NULL); + ioRequestor->closeCompleted(); +} + +bool +MmappedFile::canRead() const +{ + return fd >= 0; +} + +bool +MmappedFile::canWrite() const +{ + return fd >= 0; +} + +bool +MmappedFile::error() const +{ + return error_; +} + +void +MmappedFile::read(ReadRequest *aRequest) +{ + debugs(79,3, HERE << "(FD " << fd << ", " << aRequest->len << ", " << + aRequest->offset << ")"); + + assert(fd >= 0); + assert(ioRequestor != NULL); + assert(aRequest->len >= 0); + assert(aRequest->len > 0); // TODO: work around mmap failures on zero-len? + assert(aRequest->offset >= 0); + assert(!error_); // TODO: propagate instead? + + assert(minOffset < 0 || minOffset <= aRequest->offset); + assert(maxOffset < 0 || aRequest->offset + aRequest->len <= (uint64_t)maxOffset); + + Mmapping mapping(fd, aRequest->len, PROT_READ, MAP_PRIVATE | MAP_NORESERVE, + aRequest->offset); + + bool done = false; + if (void *buf = mapping.map()) { + memcpy(aRequest->buf, buf, aRequest->len); + done = mapping.unmap(); + } + error_ = !done; + + const ssize_t rlen = error_ ? -1 : (ssize_t)aRequest->len; + const int errflag = error_ ? DISK_ERROR : DISK_OK; + ioRequestor->readCompleted(aRequest->buf, rlen, errflag, aRequest); +} + +void +MmappedFile::write(WriteRequest *aRequest) +{ + debugs(79,3, HERE << "(FD " << fd << ", " << aRequest->len << ", " << + aRequest->offset << ")"); + + assert(fd >= 0); + assert(ioRequestor != NULL); + assert(aRequest->len >= 0); + assert(aRequest->len > 0); // TODO: work around mmap failures on zero-len? + assert(aRequest->offset >= 0); + assert(!error_); // TODO: propagate instead? + + assert(minOffset < 0 || minOffset <= aRequest->offset); + assert(maxOffset < 0 || aRequest->offset + aRequest->len <= (uint64_t)maxOffset); + + const ssize_t written = + pwrite(fd, aRequest->buf, aRequest->len, aRequest->offset); + if (written < 0) { + debugs(79,1, HERE << "error: " << xstrerr(errno)); + error_ = true; + } else + if (static_cast(written) != aRequest->len) { + debugs(79,1, HERE << "problem: " << written << " < " << aRequest->len); + error_ = true; + } + + if (aRequest->free_func) + (aRequest->free_func)(const_cast(aRequest->buf)); // broken API? + + if (!error_) { + debugs(79,5, HERE << "wrote " << aRequest->len << " to FD " << fd << " at " << aRequest->offset); + } else { + doClose(); + } + + const ssize_t rlen = error_ ? 0 : (ssize_t)aRequest->len; + const int errflag = error_ ? DISK_ERROR : DISK_OK; + ioRequestor->writeCompleted(errflag, rlen, aRequest); +} + +/// we only support blocking I/O +bool +MmappedFile::ioInProgress() const +{ + return false; +} + +Mmapping::Mmapping(int aFd, size_t aLength, int aProt, int aFlags, off_t anOffset): + fd(aFd), length(aLength), prot(aProt), flags(aFlags), offset(anOffset), + delta(-1), buf(NULL) +{ +} + +Mmapping::~Mmapping() +{ + if (buf) + unmap(); +} + +void * +Mmapping::map() +{ + // mmap(2) requires that offset is a multiple of the page size + static const int pageSize = getpagesize(); + delta = offset % pageSize; + + buf = mmap(NULL, length + delta, prot, flags, fd, offset - delta); + + if (buf == MAP_FAILED) { + const int errNo = errno; + debugs(79,3, HERE << "error FD " << fd << "mmap(" << length << '+' << + delta << ", " << offset << '-' << delta << "): " << xstrerr(errNo)); + buf = NULL; + return NULL; + } + + return static_cast(buf) + delta; +} + +bool +Mmapping::unmap() +{ + debugs(79,9, HERE << "FD " << fd << + " munmap(" << buf << ", " << length << '+' << delta << ')'); + + if (!buf) // forgot or failed to map + return false; + + const bool error = munmap(buf, length + delta) != 0; + if (error) { + const int errNo = errno; + debugs(79,3, HERE << "error FD " << fd << + " munmap(" << buf << ", " << length << '+' << delta << "): " << + "): " << xstrerr(errNo)); + } + buf = NULL; + return !error; +} + +// TODO: check MAP_NORESERVE, consider MAP_POPULATE and MAP_FIXED === added file 'src/DiskIO/Mmapped/MmappedFile.h' --- src/DiskIO/Mmapped/MmappedFile.h 1970-01-01 00:00:00 +0000 +++ src/DiskIO/Mmapped/MmappedFile.h 2011-01-27 21:14:56 +0000 @@ -0,0 +1,46 @@ +#ifndef SQUID_MMAPPEDFILE_H +#define SQUID_MMAPPEDFILE_H + +#include "cbdata.h" +#include "DiskIO/DiskFile.h" +#include "DiskIO/IORequestor.h" + +class MmappedFile : public DiskFile +{ + +public: + void *operator new(size_t); + void operator delete(void *); + MmappedFile(char const *path); + ~MmappedFile(); + virtual void open(int flags, mode_t mode, RefCount callback); + virtual void create(int flags, mode_t mode, RefCount callback); + virtual void read(ReadRequest *); + virtual void write(WriteRequest *); + virtual void close(); + virtual bool error() const; + virtual int getFD() const { return fd;} + + virtual bool canRead() const; + virtual bool canWrite() const; + virtual bool ioInProgress() const; + +private: + CBDATA_CLASS(MmappedFile); + + char const *path_; + RefCount ioRequestor; + //RefCount readRequest; + //RefCount writeRequest; + int fd; + + // mmapped memory leads to SEGV and bus errors if it maps beyond file + int64_t minOffset; ///< enforced if not negative (to preserve file headers) + int64_t maxOffset; ///< enforced if not negative (to avoid crashes) + + bool error_; + + void doClose(); +}; + +#endif /* SQUID_MMAPPEDFILE_H */ === added file 'src/DiskIO/Mmapped/MmappedIOStrategy.cc' --- src/DiskIO/Mmapped/MmappedIOStrategy.cc 1970-01-01 00:00:00 +0000 +++ src/DiskIO/Mmapped/MmappedIOStrategy.cc 2011-01-27 21:14:56 +0000 @@ -0,0 +1,37 @@ + +/* + * $Id$ + * + * DEBUG: section 47 Store Directory Routines + */ + +#include "MmappedIOStrategy.h" +#include "MmappedFile.h" +bool +MmappedIOStrategy::shedLoad() +{ + return false; +} + +int +MmappedIOStrategy::load() +{ + /* Return 999 (99.9%) constant load */ + return 999; +} + +DiskFile::Pointer +MmappedIOStrategy::newFile (char const *path) +{ + return new MmappedFile (path); +} + +void +MmappedIOStrategy::unlinkFile(char const *path) +{ +#if USE_UNLINKD + unlinkdUnlink(path); +#else + ::unlink(path); +#endif +} === added file 'src/DiskIO/Mmapped/MmappedIOStrategy.h' --- src/DiskIO/Mmapped/MmappedIOStrategy.h 1970-01-01 00:00:00 +0000 +++ src/DiskIO/Mmapped/MmappedIOStrategy.h 2011-01-27 21:14:56 +0000 @@ -0,0 +1,15 @@ +#ifndef SQUID_MMAPPEDIOSTRATEGY_H +#define SQUID_MMAPPEDIOSTRATEGY_H +#include "DiskIO/DiskIOStrategy.h" + +class MmappedIOStrategy : public DiskIOStrategy +{ + +public: + virtual bool shedLoad(); + virtual int load(); + virtual RefCount newFile(char const *path); + virtual void unlinkFile (char const *); +}; + +#endif /* SQUID_MMAPPEDIOSTRATEGY_H */ === modified file 'src/Makefile.am' --- src/Makefile.am 2011-04-11 06:05:36 +0000 +++ src/Makefile.am 2011-04-25 15:14:10 +0000 @@ -174,7 +174,8 @@ AIOPS_SOURCE = DiskIO/DiskThreads/aiops.cc endif -EXTRA_LIBRARIES = libAIO.a libBlocking.a libDiskDaemon.a libDiskThreads.a +EXTRA_LIBRARIES = libAIO.a libBlocking.a libDiskDaemon.a libDiskThreads.a \ + libMmapped.a libIpcIo.a noinst_LIBRARIES = $(DISK_LIBS) noinst_LTLIBRARIES = libsquid.la @@ -185,6 +186,7 @@ recv-announce \ tests/testUfs \ tests/testCoss \ + tests/testRock \ tests/testNull \ ufsdump @@ -464,8 +466,8 @@ Server.h \ structs.h \ swap_log_op.h \ - SwapDir.cc \ - SwapDir.h \ + SwapDir.cc MemStore.cc \ + SwapDir.h MemStore.h \ time.cc \ tools.cc \ tunnel.cc \ @@ -543,6 +545,7 @@ eui/libeui.la \ acl/libstate.la \ $(AUTH_LIBS) \ + $(DISK_LIBS) \ acl/libapi.la \ base/libbase.la \ libsquid.la \ @@ -558,7 +561,6 @@ $(XTRA_OBJS) \ $(DISK_LINKOBJS) \ $(REPL_OBJS) \ - $(DISK_LIBS) \ $(DISK_OS_LIBS) \ $(CRYPTLIB) \ $(REGEXLIB) \ @@ -769,6 +771,22 @@ DiskIO/Blocking/BlockingDiskIOModule.cc \ DiskIO/Blocking/BlockingDiskIOModule.h +libMmapped_a_SOURCES = \ + DiskIO/Mmapped/MmappedFile.cc \ + DiskIO/Mmapped/MmappedFile.h \ + DiskIO/Mmapped/MmappedIOStrategy.cc \ + DiskIO/Mmapped/MmappedIOStrategy.h \ + DiskIO/Mmapped/MmappedDiskIOModule.cc \ + DiskIO/Mmapped/MmappedDiskIOModule.h + +libIpcIo_a_SOURCES = \ + DiskIO/IpcIo/IpcIoFile.cc \ + DiskIO/IpcIo/IpcIoFile.h \ + DiskIO/IpcIo/IpcIoIOStrategy.cc \ + DiskIO/IpcIo/IpcIoIOStrategy.h \ + DiskIO/IpcIo/IpcIoDiskIOModule.cc \ + DiskIO/IpcIo/IpcIoDiskIOModule.h + libDiskDaemon_a_SOURCES = \ DiskIO/DiskDaemon/DiskdFile.cc \ DiskIO/DiskDaemon/DiskdFile.h \ @@ -1350,7 +1368,7 @@ StoreSwapLogData.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -1657,6 +1675,7 @@ time.cc \ tools.cc \ tunnel.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -1831,6 +1850,7 @@ time.cc \ tools.cc \ tunnel.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -2172,7 +2192,7 @@ event.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -2509,6 +2529,39 @@ tests_testUfs_DEPENDENCIES = \ $(SWAP_TEST_DS) +tests_testRock_SOURCES = \ + tests/testRock.cc \ + tests/testMain.cc \ + tests/testRock.h \ + tests/stub_cache_manager.cc \ + tests/stub_HelperChildConfig.cc \ + tests/stub_Port.cc \ + tests/stub_TypedMsgHdr.cc \ + tests/stub_UdsOp.cc \ + $(SWAP_TEST_SOURCES) +nodist_tests_testRock_SOURCES = \ + swap_log_op.cc \ + $(SWAP_TEST_GEN_SOURCES) \ + SquidMath.cc \ + SquidMath.h +tests_testRock_LDADD = \ + $(COMMON_LIBS) \ + $(REPL_OBJS) \ + $(DISK_LIBS) \ + $(DISK_OS_LIBS) \ + acl/libapi.la \ + $(top_builddir)/lib/libmisccontainers.la \ + $(top_builddir)/lib/libmiscencoding.la \ + $(top_builddir)/lib/libmiscutil.la \ + $(REGEXLIB) \ + $(SQUID_CPPUNIT_LIBS) \ + $(SSLLIB) \ + $(COMPAT_LIB) \ + $(XTRA_LIBS) +tests_testRock_LDFLAGS = $(LIBADD_DL) +tests_testRock_DEPENDENCIES = \ + $(SWAP_TEST_DS) + tests_testCoss_SOURCES = \ tests/testCoss.cc \ tests/testMain.cc \ @@ -2878,7 +2931,7 @@ event.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc \ + SwapDir.cc MemStore.cc \ urn.cc \ wccp2.cc \ whois.cc \ === modified file 'src/cf.data.pre' --- src/cf.data.pre 2011-04-05 20:57:57 +0000 +++ src/cf.data.pre 2011-05-11 22:38:35 +0000 @@ -2702,6 +2702,16 @@ higher hit ratio at the expense of an increase in response time. + The rock store type: + + cache_dir rock Directory-Name Mbytes + + The Rock Store type is a database-style storage. All cached + entries are stored in a "database" file, using fixed-size slots, + one entry per slot. The database size is specified in MB. The + slot size is specified in bytes using the max-size option. See + below for more info on the max-size option. + The coss store type: NP: COSS filesystem in Squid-3 has been deemed too unstable for @@ -2852,8 +2862,9 @@ ' output as-is - left aligned - width field width. If starting with 0 the - output is zero padded + width minimum and/or maximum field width: [min][.max] + When minimum starts with 0, the field is zero-padded. + String values exceeding maximum width are truncated. {arg} argument such as header name etc Format codes: @@ -4019,8 +4030,8 @@ DOC_END NAME: store_avg_object_size -COMMENT: (kbytes) -TYPE: kb_int64_t +COMMENT: (bytes) +TYPE: b_int64_t DEFAULT: 13 KB LOC: Config.Store.avgObjectSize DOC_START @@ -6516,11 +6527,12 @@ returning a chain of services to be used next. The services are specified using the X-Next-Services ICAP response header value, formatted as a comma-separated list of service names. - Each named service should be configured in squid.conf and - should have the same method and vectoring point as the current - ICAP transaction. Services violating these rules are ignored. - An empty X-Next-Services value results in an empty plan which - ends the current adaptation. + Each named service should be configured in squid.conf. Other + services are ignored. An empty X-Next-Services value results + in an empty plan which ends the current adaptation. + + Dynamic adaptation plan may cross or cover multiple supported + vectoring points in their natural processing order. Routing is not allowed by default: the ICAP X-Next-Services response header is ignored. === modified file 'src/fs/Makefile.am' --- src/fs/Makefile.am 2010-08-20 16:15:46 +0000 +++ src/fs/Makefile.am 2011-04-25 15:14:10 +0000 @@ -1,6 +1,6 @@ include $(top_srcdir)/src/Common.am -EXTRA_LTLIBRARIES = libaufs.la libdiskd.la libcoss.la libufs.la +EXTRA_LTLIBRARIES = libaufs.la libdiskd.la libcoss.la libufs.la librock.la noinst_LTLIBRARIES = $(STORE_LIBS_TO_BUILD) libfs.la # aufs is a "fake" legacy store @@ -28,6 +28,22 @@ ufs/ufscommon.cc \ ufs/ufscommon.h +librock_la_SOURCES = \ + rock/RockCommon.cc \ + rock/RockCommon.h \ + rock/RockFile.cc \ + rock/RockFile.h \ + rock/RockIoState.cc \ + rock/RockIoState.h \ + rock/RockIoRequests.cc \ + rock/RockIoRequests.h \ + rock/RockRebuild.cc \ + rock/RockRebuild.h \ + rock/RockStoreFileSystem.cc \ + rock/RockStoreFileSystem.h \ + rock/RockSwapDir.cc \ + rock/RockSwapDir.h + libfs_la_SOURCES = Module.cc Module.h libfs_la_LIBADD = $(STORE_LIBS_TO_BUILD) libfs_la_DEPENDENCIES = $(STORE_LIBS_TO_BUILD) @@ -44,13 +60,15 @@ coss/clean: clean ufs/all: libufs.la ufs/clean: clean +rock/all: librock.la +rock/clean: clean TESTS += testHeaders ## Special Universal .h dependency test script ## aborts if error encountered -testHeaders: $(srcdir)/ufs/*.h $(srcdir)/coss/*.h +testHeaders: $(srcdir)/ufs/*.h $(srcdir)/coss/*.h $(srcdir)/rock/*.h $(SHELL) $(top_srcdir)/test-suite/testheaders.sh "$(CXXCOMPILE)" $^ || exit 1 ## diskd/ has no .h files ## aufs/ has no .h files === modified file 'src/fs/Module.cc' --- src/fs/Module.cc 2009-12-26 00:25:57 +0000 +++ src/fs/Module.cc 2011-01-27 21:14:56 +0000 @@ -22,6 +22,12 @@ static StoreFSufs *DiskdInstance; #endif +#ifdef HAVE_FS_ROCK +#include "fs/rock/RockStoreFileSystem.h" +static Rock::StoreFileSystem *RockInstance = NULL; +#endif + + /* TODO: Modify coss code to: * (a) remove the StoreFScoss::GetInstance method, * (b) declare the StoreFScoss::stats as static and @@ -48,6 +54,10 @@ DiskdInstance = new StoreFSufs("DiskDaemon", "diskd");; #endif +#ifdef HAVE_FS_ROCK + RockInstance = new Rock::StoreFileSystem(); +#endif + } @@ -66,4 +76,8 @@ delete DiskdInstance; #endif +#ifdef HAVE_FS_ROCK + delete RockInstance; +#endif + } === added directory 'src/fs/rock' === added file 'src/fs/rock/RockCommon.cc' === added file 'src/fs/rock/RockCommon.h' === added file 'src/fs/rock/RockFile.cc' === added file 'src/fs/rock/RockFile.h' --- src/fs/rock/RockFile.h 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockFile.h 2011-02-15 04:09:58 +0000 @@ -0,0 +1,24 @@ +#ifndef SQUID_FS_ROCK_DB_CELL_H +#define SQUID_FS_ROCK_DB_CELL_H + +// XXX: rename to fs/rock/RockDbCell.{cc,h} + +namespace Rock { + +/// \ingroup Rock +/// meta-information at the beginning of every db cell +class DbCellHeader +{ +public: + DbCellHeader(): payloadSize(0), reserved(0) {} + + /// whether the freshly loaded header fields make sense + bool sane() const { return payloadSize >= 0 && reserved == 0; } + + int64_t payloadSize; ///< cell contents size excluding this header + int64_t reserved; ///< reserved for future use (next cell pointer?) +}; + +} // namespace Rock + +#endif /* SQUID_FS_ROCK_DB_CELL_H */ === added file 'src/fs/rock/RockIoRequests.cc' --- src/fs/rock/RockIoRequests.cc 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockIoRequests.cc 2011-01-27 21:14:56 +0000 @@ -0,0 +1,24 @@ +/* + * $Id$ + * + * DEBUG: section 79 Disk IO Routines + */ + +#include "fs/rock/RockIoRequests.h" + +CBDATA_NAMESPACED_CLASS_INIT(Rock, ReadRequest); +CBDATA_NAMESPACED_CLASS_INIT(Rock, WriteRequest); + +Rock::ReadRequest::ReadRequest(const ::ReadRequest &base, + const IoState::Pointer &anSio): + ::ReadRequest(base), + sio(anSio) +{ +} + +Rock::WriteRequest::WriteRequest(const ::WriteRequest &base, + const IoState::Pointer &anSio): + ::WriteRequest(base), + sio(anSio) +{ +} === added file 'src/fs/rock/RockIoRequests.h' --- src/fs/rock/RockIoRequests.h 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockIoRequests.h 2011-01-27 21:14:56 +0000 @@ -0,0 +1,38 @@ +#ifndef SQUID_FS_ROCK_IO_REQUESTS_H +#define SQUID_FS_ROCK_IO_REQUESTS_H + +#include "DiskIO/ReadRequest.h" +#include "DiskIO/WriteRequest.h" +#include "fs/rock/RockIoState.h" + +class DiskFile; + +namespace Rock { + +/// \ingroup Rock +class ReadRequest: public ::ReadRequest +{ +public: + ReadRequest(const ::ReadRequest &base, const IoState::Pointer &anSio); + IoState::Pointer sio; + +private: + CBDATA_CLASS2(ReadRequest); +}; + + +/// \ingroup Rock +class WriteRequest: public ::WriteRequest +{ +public: + WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio); + IoState::Pointer sio; + +private: + CBDATA_CLASS2(WriteRequest); +}; + + +} // namespace Rock + +#endif /* SQUID_FS_ROCK_IO_REQUESTS_H */ === added file 'src/fs/rock/RockIoState.cc' --- src/fs/rock/RockIoState.cc 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockIoState.cc 2011-02-15 04:09:58 +0000 @@ -0,0 +1,212 @@ +/* + * $Id$ + * + * DEBUG: section 79 Disk IO Routines + */ + +#include "config.h" +#include "MemObject.h" +#include "Parsing.h" +#include "DiskIO/DiskIOModule.h" +#include "DiskIO/DiskIOStrategy.h" +#include "DiskIO/WriteRequest.h" +#include "fs/rock/RockIoState.h" +#include "fs/rock/RockIoRequests.h" +#include "fs/rock/RockSwapDir.h" + +Rock::IoState::IoState(SwapDir *dir, + StoreEntry *anEntry, + StoreIOState::STFNCB *cbFile, + StoreIOState::STIOCB *cbIo, + void *data): + slotSize(0), + diskOffset(-1), + payloadEnd(-1) +{ + e = anEntry; + // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller + slotSize = dir->max_objsize; + file_callback = cbFile; + callback = cbIo; + callback_data = cbdataReference(data); + ++store_open_disk_fd; // TODO: use a dedicated counter? + //theFile is set by SwapDir because it depends on DiskIOStrategy +} + +Rock::IoState::~IoState() +{ + --store_open_disk_fd; + if (callback_data) + cbdataReferenceDone(callback_data); + theFile = NULL; +} + +void +Rock::IoState::file(const RefCount &aFile) +{ + assert(!theFile); + assert(aFile != NULL); + theFile = aFile; +} + +void +Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data) +{ + assert(theFile != NULL); + assert(theFile->canRead()); + assert(coreOff >= 0); + offset_ = coreOff; + + // we skip our cell header; it is only read when building the map + const int64_t cellOffset = sizeof(DbCellHeader) + + static_cast(coreOff); + assert(cellOffset <= payloadEnd); + + // Core specifies buffer length, but we must not exceed stored entry size + if (cellOffset + (int64_t)len > payloadEnd) + len = payloadEnd - cellOffset; + + assert(read.callback == NULL); + assert(read.callback_data == NULL); + read.callback = cb; + read.callback_data = cbdataReference(data); + + theFile->read(new ReadRequest( + ::ReadRequest(buf, diskOffset + cellOffset, len), this)); +} + +// We only buffer data here; we actually write when close() is called. +// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions +// of the slot when the write does not end at the page or sector boundary. +void +Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor) +{ + // TODO: move to create? + if (!coreOff) { + assert(theBuf.isNull()); + assert(payloadEnd <= slotSize); + theBuf.init(min(payloadEnd, slotSize), slotSize); + // start with our header; TODO: consider making it a trailer + DbCellHeader header; + assert(static_cast(sizeof(header)) <= payloadEnd); + header.payloadSize = payloadEnd - sizeof(header); + theBuf.append(reinterpret_cast(&header), sizeof(header)); + } else { + // Core uses -1 offset as "append". Sigh. + assert(coreOff == -1); + assert(!theBuf.isNull()); + } + + theBuf.append(buf, size); + offset_ += size; // so that Core thinks we wrote it + + if (dtor) + (dtor)(const_cast(buf)); // cast due to a broken API? +} + +// write what was buffered during write() calls +void +Rock::IoState::startWriting() +{ + assert(theFile != NULL); + assert(theFile->canWrite()); + assert(!theBuf.isNull()); + + // TODO: if DiskIO module is mmap-based, we should be writing whole pages + // to avoid triggering read-page;new_head+old_tail;write-page overheads + + debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' << + theBuf.contentSize()); + + assert(theBuf.contentSize() <= slotSize); + // theFile->write may call writeCompleted immediatelly + theFile->write(new WriteRequest(::WriteRequest(theBuf.content(), + diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this)); +} + +// +void +Rock::IoState::finishedWriting(const int errFlag) +{ + // we incremented offset_ while accumulating data in write() + callBack(errFlag); +} + +void +Rock::IoState::close(int how) +{ + debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ << + " how=" << how); + if (how == wroteAll && !theBuf.isNull()) + startWriting(); + else + callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE +} + +/// close callback (STIOCB) dialer: breaks dependencies and +/// counts IOState concurrency level +class StoreIOStateCb: public CallDialer +{ +public: + StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio): + callback(NULL), + callback_data(NULL), + errflag(err), + sio(anSio) { + + callback = cb; + callback_data = cbdataReference(data); + } + + StoreIOStateCb(const StoreIOStateCb &cb): + callback(NULL), + callback_data(NULL), + errflag(cb.errflag), + sio(cb.sio) { + + callback = cb.callback; + callback_data = cbdataReference(cb.callback_data); + } + + virtual ~StoreIOStateCb() { + cbdataReferenceDone(callback_data); // may be nil already + } + + void dial(AsyncCall &call) { + void *cbd; + if (cbdataReferenceValidDone(callback_data, &cbd) && callback) + callback(cbd, errflag, sio.getRaw()); + } + + bool canDial(AsyncCall &call) const { + return cbdataReferenceValid(callback_data) && callback; + } + + virtual void print(std::ostream &os) const { + os << '(' << callback_data << ", err=" << errflag << ')'; + } + +private: + StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined + + StoreIOState::STIOCB *callback; + void *callback_data; + int errflag; + Rock::IoState::Pointer sio; +}; + + +void +Rock::IoState::callBack(int errflag) +{ + debugs(79,3, HERE << "errflag=" << errflag); + theFile = NULL; + + AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb", + StoreIOStateCb(callback, callback_data, errflag, this)); + ScheduleCallHere(call); + + callback = NULL; + cbdataReferenceDone(callback_data); +} + === added file 'src/fs/rock/RockIoState.h' --- src/fs/rock/RockIoState.h 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockIoState.h 2011-02-15 04:09:58 +0000 @@ -0,0 +1,53 @@ +#ifndef SQUID_FS_ROCK_IO_STATE_H +#define SQUID_FS_ROCK_IO_STATE_H + +#include "MemBuf.h" +#include "SwapDir.h" + +class DiskFile; + +namespace Rock { + +class SwapDir; + +/// \ingroup Rock +class IoState: public ::StoreIOState +{ +public: + typedef RefCount Pointer; + + IoState(SwapDir *dir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data); + virtual ~IoState(); + + void file(const RefCount &aFile); + + // ::StoreIOState API + virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data); + virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func); + virtual void close(int how); + + /// called by SwapDir when writing is done + void finishedWriting(int errFlag); + + int64_t slotSize; ///< db cell size + int64_t diskOffset; ///< the start of this cell inside the db file + + /// when reading: number of bytes previously written to the db cell; + /// when writing: maximum payload offset in a db cell + int64_t payloadEnd; + + MEMPROXY_CLASS(IoState); + +private: + void startWriting(); + void callBack(int errflag); + + RefCount theFile; // "file" responsible for this I/O + MemBuf theBuf; // use for write content accumulation only +}; + +MEMPROXY_CLASS_INLINE(IoState); + +} // namespace Rock + +#endif /* SQUID_FS_ROCK_IO_STATE_H */ === added file 'src/fs/rock/RockRebuild.cc' --- src/fs/rock/RockRebuild.cc 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockRebuild.cc 2011-02-15 04:09:58 +0000 @@ -0,0 +1,179 @@ +/* + * $Id$ + * + * DEBUG: section 79 Disk IO Routines + */ + +#include "config.h" +#include "fs/rock/RockRebuild.h" +#include "fs/rock/RockSwapDir.h" +#include "fs/rock/RockFile.h" + +CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild); + +Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"), + sd(dir), + dbSize(0), + dbEntrySize(0), + dbEntryLimit(0), + fd(-1), + dbOffset(0), + fileno(0) +{ + assert(sd); + memset(&counts, 0, sizeof(counts)); + dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste + dbEntrySize = sd->max_objsize; + dbEntryLimit = sd->entryLimit(); +} + +Rock::Rebuild::~Rebuild() +{ + if (fd >= 0) + file_close(fd); +} + +/// prepares and initiates entry loading sequence +void +Rock::Rebuild::start() { + // in SMP mode, only the disker is responsible for populating the map + if (UsingSmp() && !IamDiskProcess()) { + debugs(47, 2, "Non-disker skips rebuilding of cache_dir #" << + sd->index << " from " << sd->filePath); + mustStop("non-disker"); + return; + } + + debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index << + " from " << sd->filePath); + + fd = file_open(sd->filePath, O_RDONLY | O_BINARY); + if (fd < 0) + failure("cannot open db", errno); + + char buf[SwapDir::HeaderSize]; + if (read(fd, buf, sizeof(buf)) != SwapDir::HeaderSize) + failure("cannot read db header", errno); + + dbOffset = SwapDir::HeaderSize; + fileno = 0; + + checkpoint(); +} + +/// continues after a pause if not done +void +Rock::Rebuild::checkpoint() +{ + if (!done()) + eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true); +} + +bool +Rock::Rebuild::doneAll() const +{ + return dbOffset >= dbSize && AsyncJob::doneAll(); +} + +void +Rock::Rebuild::Steps(void *data) +{ + // use async call to enable job call protection that time events lack + CallJobHere(47, 5, static_cast(data), Rock::Rebuild, steps); +} + +void +Rock::Rebuild::steps() { + debugs(47,5, HERE << sd->index << " fileno " << fileno << " at " << + dbOffset << " <= " << dbSize); + + const int maxCount = dbEntryLimit; + const int wantedCount = opt_foreground_rebuild ? maxCount : 50; + const int stepCount = min(wantedCount, maxCount); + for (int i = 0; i < stepCount && dbOffset < dbSize; ++i, ++fileno) { + doOneEntry(); + dbOffset += dbEntrySize; + + if (counts.scancount % 1000 == 0) + storeRebuildProgress(sd->index, maxCount, counts.scancount); + } + + checkpoint(); +} + +void +Rock::Rebuild::doOneEntry() { + debugs(47,5, HERE << sd->index << " fileno " << fileno << " at " << + dbOffset << " <= " << dbSize); + + ++counts.scancount; + + if (lseek(fd, dbOffset, SEEK_SET) < 0) + failure("cannot seek to db entry", errno); + + MemBuf buf; + buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE); + + if (!storeRebuildLoadEntry(fd, sd->index, buf, counts)) + return; + + // get our header + DbCellHeader header; + if (buf.contentSize() < static_cast(sizeof(header))) { + debugs(47, 1, "cache_dir[" << sd->index << "]: " << + "truncated swap entry meta data at " << dbOffset); + counts.invalid++; + return; + } + memcpy(&header, buf.content(), sizeof(header)); + + if (!header.sane()) { + debugs(47, 1, "cache_dir[" << sd->index << "]: " << + "malformed rock db cell header at " << dbOffset); + counts.invalid++; + return; + } + buf.consume(sizeof(header)); // optimize to avoid memmove() + + cache_key key[SQUID_MD5_DIGEST_LENGTH]; + StoreEntry loadedE; + if (!storeRebuildParseEntry(buf, loadedE, key, counts, header.payloadSize)) { + // skip empty slots + if (loadedE.swap_filen > 0 || loadedE.swap_file_sz > 0) { + counts.invalid++; + //sd->unlink(fileno); leave garbage on disk, it should not hurt + } + return; + } + + assert(loadedE.swap_filen < dbEntryLimit); + if (!storeRebuildKeepEntry(loadedE, key, counts)) + return; + + counts.objcount++; + // loadedE->dump(5); + + sd->addEntry(fileno, header, loadedE); +} + +void +Rock::Rebuild::swanSong() { + debugs(47,3, HERE << "cache_dir #" << sd->index << " rebuild level: " << + StoreController::store_dirs_rebuilding); + --StoreController::store_dirs_rebuilding; + storeRebuildComplete(&counts); +} + +void +Rock::Rebuild::failure(const char *msg, int errNo) { + debugs(47,5, HERE << sd->index << " fileno " << fileno << " at " << + dbOffset << " <= " << dbSize); + + if (errNo) + debugs(47,0, "Rock cache_dir rebuild failure: " << xstrerr(errNo)); + debugs(47,0, "Do you need to run 'squid -z' to initialize storage?"); + + assert(sd); + fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.", + sd->index, sd->filePath, msg); +} === added file 'src/fs/rock/RockRebuild.h' --- src/fs/rock/RockRebuild.h 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockRebuild.h 2011-02-07 17:59:28 +0000 @@ -0,0 +1,50 @@ +#ifndef SQUID_FS_ROCK_REBUILD_H +#define SQUID_FS_ROCK_REBUILD_H + +#include "config.h" +#include "base/AsyncJob.h" +#include "structs.h" + +namespace Rock { + +class SwapDir; + +/// \ingroup Rock +/// manages store rebuild process: loading meta information from db on disk +class Rebuild: public AsyncJob { +public: + Rebuild(SwapDir *dir); + ~Rebuild(); + +protected: + /* AsyncJob API */ + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); + +private: + void checkpoint(); + void steps(); + void doOneEntry(); + void failure(const char *msg, int errNo = 0); + + SwapDir *sd; + + int64_t dbSize; + int dbEntrySize; + int dbEntryLimit; + + int fd; // store db file descriptor + int64_t dbOffset; + int fileno; + + struct _store_rebuild_data counts; + + static void Steps(void *data); + + CBDATA_CLASS2(Rebuild); +}; + +} // namespace Rock + +#endif /* SQUID_FS_ROCK_REBUILD_H */ === added file 'src/fs/rock/RockStoreFileSystem.cc' --- src/fs/rock/RockStoreFileSystem.cc 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockStoreFileSystem.cc 2011-01-27 21:14:56 +0000 @@ -0,0 +1,53 @@ +/* + * $Id$ + * + * DEBUG: section 92 Storage File System + */ + +#include "fs/rock/RockStoreFileSystem.h" +#include "fs/rock/RockSwapDir.h" + + +Rock::StoreFileSystem::StoreFileSystem() +{ + FsAdd(*this); +} + +Rock::StoreFileSystem::~StoreFileSystem() +{ +} + +char const * +Rock::StoreFileSystem::type() const +{ + return "rock"; +} + +SwapDir * +Rock::StoreFileSystem::createSwapDir() +{ + return new SwapDir(); +} + +void +Rock::StoreFileSystem::done() +{ +} + +void +Rock::StoreFileSystem::registerWithCacheManager() +{ + assert(false); // XXX: implement +} + +void +Rock::StoreFileSystem::setup() +{ + debugs(92,2, HERE << "Will use Rock FS"); +} + +void +Rock::StoreFileSystem::Stats(StoreEntry *sentry) +{ + assert(false); // XXX: implement +} === added file 'src/fs/rock/RockStoreFileSystem.h' --- src/fs/rock/RockStoreFileSystem.h 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockStoreFileSystem.h 2011-01-27 21:14:56 +0000 @@ -0,0 +1,33 @@ +#ifndef SQUID_FS_ROCK_FS_H +#define SQUID_FS_ROCK_FS_H + +#include "StoreFileSystem.h" + +namespace Rock { + +/// \ingroup Rock, FileSystems +class StoreFileSystem: public ::StoreFileSystem +{ + +public: + static void Stats(StoreEntry * sentry); + + StoreFileSystem(); + virtual ~StoreFileSystem(); + + virtual char const *type() const; + virtual SwapDir *createSwapDir(); + virtual void done(); + virtual void registerWithCacheManager(); + virtual void setup(); + +private: + //static Stats Stats_; + + StoreFileSystem(const StoreFileSystem &); // not implemented + StoreFileSystem &operator=(const StoreFileSystem &); // not implemented +}; + +} // namespace Rock + +#endif /* SQUID_FS_ROCK_FS_H */ === added file 'src/fs/rock/RockSwapDir.cc' --- src/fs/rock/RockSwapDir.cc 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockSwapDir.cc 2011-05-12 03:58:16 +0000 @@ -0,0 +1,721 @@ +/* + * $Id$ + * + * DEBUG: section 47 Store Directory Routines + */ + +#include "config.h" +#include "Parsing.h" +#include +#include "MemObject.h" +#include "SquidMath.h" +#include "base/RunnersRegistry.h" +#include "DiskIO/DiskIOModule.h" +#include "DiskIO/DiskIOStrategy.h" +#include "DiskIO/ReadRequest.h" +#include "DiskIO/WriteRequest.h" +#include "fs/rock/RockSwapDir.h" +#include "fs/rock/RockIoState.h" +#include "fs/rock/RockIoRequests.h" +#include "fs/rock/RockRebuild.h" + +const int64_t Rock::SwapDir::HeaderSize = 16*1024; + +Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), map(NULL) +{ +} + +Rock::SwapDir::~SwapDir() +{ + delete io; + delete map; + safe_free(filePath); +} + +StoreSearch * +Rock::SwapDir::search(String const url, HttpRequest *) +{ + assert(false); return NULL; // XXX: implement +} + +// called when Squid core needs a StoreEntry with a given key +StoreEntry * +Rock::SwapDir::get(const cache_key *key) +{ + if (!map) + return NULL; + + sfileno fileno; + const Ipc::StoreMapSlot *const slot = map->openForReading(key, fileno); + if (!slot) + return NULL; + + const Ipc::StoreMapSlot::Basics &basics = slot->basics; + + // create a brand new store entry and initialize it with stored basics + StoreEntry *e = new StoreEntry(); + e->lock_count = 0; + e->swap_dirn = index; + e->swap_filen = fileno; + e->swap_file_sz = basics.swap_file_sz; + e->lastref = basics.lastref; + e->timestamp = basics.timestamp; + e->expires = basics.expires; + e->lastmod = basics.lastmod; + e->refcount = basics.refcount; + e->flags = basics.flags; + e->store_status = STORE_OK; + e->setMemStatus(NOT_IN_MEMORY); + e->swap_status = SWAPOUT_DONE; + e->ping_status = PING_NONE; + EBIT_SET(e->flags, ENTRY_CACHABLE); + EBIT_CLR(e->flags, RELEASE_REQUEST); + EBIT_CLR(e->flags, KEY_PRIVATE); + EBIT_SET(e->flags, ENTRY_VALIDATED); + e->hashInsert(key); + trackReferences(*e); + + return e; + // the disk entry remains open for reading, protected from modifications +} + +void Rock::SwapDir::disconnect(StoreEntry &e) +{ + assert(e.swap_dirn == index); + assert(e.swap_filen >= 0); + // cannot have SWAPOUT_NONE entry with swap_filen >= 0 + assert(e.swap_status != SWAPOUT_NONE); + + // do not rely on e.swap_status here because there is an async delay + // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE. + + // since e has swap_filen, its slot is locked for either reading or writing + map->abortIo(e.swap_filen); + e.swap_dirn = -1; + e.swap_filen = -1; + e.swap_status = SWAPOUT_NONE; +} + +uint64_t +Rock::SwapDir::currentSize() const +{ + return HeaderSize + max_objsize * currentCount(); +} + +uint64_t +Rock::SwapDir::currentCount() const +{ + return map ? map->entryCount() : 0; +} + +/// In SMP mode only the disker process reports stats to avoid +/// counting the same stats by multiple processes. +bool +Rock::SwapDir::doReportStat() const +{ + return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess()); +} + +void +Rock::SwapDir::swappedOut(const StoreEntry &) +{ + // stats are not stored but computed when needed +} + +int64_t +Rock::SwapDir::entryLimitAllowed() const +{ + const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported + const int64_t eWanted = (maxSize() - HeaderSize)/maxObjectSize(); + return min(max(eLimitLo, eWanted), entryLimitHigh()); +} + +// TODO: encapsulate as a tool; identical to CossSwapDir::create() +void +Rock::SwapDir::create() +{ + assert(path); + assert(filePath); + + if (UsingSmp() && !IamDiskProcess()) { + debugs (47,3, HERE << "disker will create in " << path); + return; + } + + debugs (47,3, HERE << "creating in " << path); + + struct stat swap_sb; + if (::stat(path, &swap_sb) < 0) { + debugs (47, 1, "Creating Rock db directory: " << path); +#ifdef _SQUID_MSWIN_ + const int res = mkdir(path); +#else + const int res = mkdir(path, 0700); +#endif + if (res != 0) { + debugs(47,0, "Failed to create Rock db dir " << path << + ": " << xstrerror()); + fatal("Rock Store db creation error"); + } + } + +#if SLOWLY_FILL_WITH_ZEROS + /* TODO just set the file size */ + char block[1024]; + Must(maxSize() % sizeof(block) == 0); + memset(block, '\0', sizeof(block)); + + const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600); + for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) { + if (write(swap, block, sizeof(block)) != sizeof(block)) { + debugs(47,0, "Failed to create Rock Store db in " << filePath << + ": " << xstrerror()); + fatal("Rock Store db creation error"); + } + } + close(swap); +#else + const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600); + if (swap < 0) { + debugs(47,0, "Failed to initialize Rock Store db in " << filePath << + "; create error: " << xstrerror()); + fatal("Rock Store db creation error"); + } + + if (ftruncate(swap, maxSize()) != 0) { + debugs(47,0, "Failed to initialize Rock Store db in " << filePath << + "; truncate error: " << xstrerror()); + fatal("Rock Store db creation error"); + } + + char header[HeaderSize]; + memset(header, '\0', sizeof(header)); + if (write(swap, header, sizeof(header)) != sizeof(header)) { + debugs(47,0, "Failed to initialize Rock Store db in " << filePath << + "; write error: " << xstrerror()); + fatal("Rock Store db initialization error"); + } + close(swap); +#endif + +} + +void +Rock::SwapDir::init() +{ + debugs(47,2, HERE); + + // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which + // are refcounted. We up our count once to avoid implicit delete's. + RefCountReference(); + + Must(!map); + map = new DirMap(path); + + const char *ioModule = UsingSmp() ? "IpcIo" : "Blocking"; + if (DiskIOModule *m = DiskIOModule::Find(ioModule)) { + debugs(47,2, HERE << "Using DiskIO module: " << ioModule); + io = m->createStrategy(); + io->init(); + } else { + debugs(47,1, "Rock store is missing DiskIO module: " << ioModule); + fatal("Rock Store missing a required DiskIO module"); + } + + theFile = io->newFile(filePath); + theFile->open(O_RDWR, 0644, this); + + // Increment early. Otherwise, if one SwapDir finishes rebuild before + // others start, storeRebuildComplete() will think the rebuild is over! + // TODO: move store_dirs_rebuilding hack to store modules that need it. + ++StoreController::store_dirs_rebuilding; +} + +bool +Rock::SwapDir::needsDiskStrand() const +{ + return true; +} + +void +Rock::SwapDir::parse(int anIndex, char *aPath) +{ + index = anIndex; + + path = xstrdup(aPath); + + // cache store is located at path/db + String fname(path); + fname.append("/rock"); + filePath = xstrdup(fname.termedBuf()); + + parseSize(); + parseOptions(0); + + // Current openForWriting() code overwrites the old slot if needed + // and possible, so proactively removing old slots is probably useless. + assert(!repl); // repl = createRemovalPolicy(Config.replPolicy); + + validateOptions(); +} + +void +Rock::SwapDir::reconfigure(int, char *) +{ + parseSize(); + parseOptions(1); + // TODO: can we reconfigure the replacement policy (repl)? + validateOptions(); +} + +/// parse maximum db disk size +void +Rock::SwapDir::parseSize() +{ + const int i = GetInteger(); + if (i < 0) + fatal("negative Rock cache_dir size value"); + max_size = i << 20; // MBytes to Bytes +} + +/// check the results of the configuration; only level-0 debugging works here +void +Rock::SwapDir::validateOptions() +{ + if (max_objsize <= 0) + fatal("Rock store requires a positive max-size"); + + /* XXX: should we support resize? + map->resize(entryLimitAllowed()); // the map may decide to use an even lower limit + */ + + /* XXX: misplaced, map is not yet created + // XXX: max_size is in Bytes now + // Note: We could try to shrink max_size now. It is stored in KB so we + // may not be able to make it match the end of the last entry exactly. + const int64_t mapRoundWasteMx = max_objsize*sizeof(long)*8; + const int64_t sizeRoundWasteMx = 1024; // max_size stored in KB + const int64_t roundingWasteMx = max(mapRoundWasteMx, sizeRoundWasteMx); + const int64_t totalWaste = maxSize() - diskOffsetLimit(); + assert(diskOffsetLimit() <= maxSize()); + + // warn if maximum db size is not reachable due to sfileno limit + if (map->entryLimit() == entryLimitHigh() && totalWaste > roundingWasteMx) { + debugs(47, 0, "Rock store cache_dir[" << index << "]:"); + debugs(47, 0, "\tmaximum number of entries: " << map->entryLimit()); + debugs(47, 0, "\tmaximum entry size: " << max_objsize << " bytes"); + debugs(47, 0, "\tmaximum db size: " << maxSize() << " bytes"); + debugs(47, 0, "\tusable db size: " << diskOffsetLimit() << " bytes"); + debugs(47, 0, "\tdisk space waste: " << totalWaste << " bytes"); + debugs(47, 0, "WARNING: Rock store config wastes space."); + } + */ +} + +void +Rock::SwapDir::rebuild() { + //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init() + AsyncJob::Start(new Rebuild(this)); +} + +/* Add a new object to the cache with empty memory copy and pointer to disk + * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */ +bool +Rock::SwapDir::addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from) +{ + debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() << + ", fileno="<< std::setfill('0') << std::hex << std::uppercase << + std::setw(8) << fileno); + + sfileno newLocation = 0; + if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast(from.key), newLocation)) { + if (fileno == newLocation) { + slot->set(from); + map->extras(fileno) = header; + } // else some other, newer entry got into our cell + map->closeForWriting(newLocation, false); + return fileno == newLocation; + } + + return false; +} + + +bool +Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const +{ + if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load)) + return false; + + if (!theFile || !theFile->canWrite()) + return false; + + if (!map) + return false; + + if (io->shedLoad()) + return false; + + load = io->load(); + return true; +} + +StoreIOState::Pointer +Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data) +{ + if (!theFile || theFile->error()) { + debugs(47,4, HERE << theFile); + return NULL; + } + + // compute payload size for our cell header, using StoreEntry info + // careful: e.objectLen() may still be negative here + const int64_t expectedReplySize = e.mem_obj->expectedReplySize(); + assert(expectedReplySize >= 0); // must know to prevent cell overflows + assert(e.mem_obj->swap_hdr_sz > 0); + DbCellHeader header; + header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize; + const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize; + assert(payloadEnd <= max_objsize); + + sfileno fileno; + Ipc::StoreMapSlot *const slot = + map->openForWriting(reinterpret_cast(e.key), fileno); + if (!slot) { + debugs(47, 5, HERE << "Rock::SwapDir::createStoreIO: map->add failed"); + return NULL; + } + e.swap_file_sz = header.payloadSize; // and will be copied to the map + slot->set(e); + map->extras(fileno) = header; + + // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno. + // If that does not happen, the entry will not decrement the read level! + + IoState *sio = new IoState(this, &e, cbFile, cbIo, data); + + sio->swap_dirn = index; + sio->swap_filen = fileno; + sio->payloadEnd = payloadEnd; + sio->diskOffset = diskOffset(sio->swap_filen); + + debugs(47,5, HERE << "dir " << index << " created new fileno " << + std::setfill('0') << std::hex << std::uppercase << std::setw(8) << + sio->swap_filen << std::dec << " at " << sio->diskOffset); + + assert(sio->diskOffset + payloadEnd <= diskOffsetLimit()); + + sio->file(theFile); + + trackReferences(e); + return sio; +} + +int64_t +Rock::SwapDir::diskOffset(int filen) const +{ + assert(filen >= 0); + return HeaderSize + max_objsize*filen; +} + +int64_t +Rock::SwapDir::diskOffsetLimit() const +{ + assert(map); + return diskOffset(map->entryLimit()); +} + +// tries to open an old or being-written-to entry with swap_filen for reading +StoreIOState::Pointer +Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data) +{ + if (!theFile || theFile->error()) { + debugs(47,4, HERE << theFile); + return NULL; + } + + if (e.swap_filen < 0) { + debugs(47,4, HERE << e); + return NULL; + } + + // The are two ways an entry can get swap_filen: our get() locked it for + // reading or our storeSwapOutStart() locked it for writing. Peeking at our + // locked entry is safe, but no support for reading a filling entry. + const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen); + if (!slot) + return NULL; // we were writing afterall + + IoState *sio = new IoState(this, &e, cbFile, cbIo, data); + + sio->swap_dirn = index; + sio->swap_filen = e.swap_filen; + sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize; + assert(sio->payloadEnd <= max_objsize); // the payload fits the slot + + debugs(47,5, HERE << "dir " << index << " has old fileno: " << + std::setfill('0') << std::hex << std::uppercase << std::setw(8) << + sio->swap_filen); + + assert(slot->basics.swap_file_sz > 0); + assert(slot->basics.swap_file_sz == e.swap_file_sz); + + sio->diskOffset = diskOffset(sio->swap_filen); + assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit()); + + sio->file(theFile); + return sio; +} + +void +Rock::SwapDir::ioCompletedNotification() +{ + if (!theFile) { + debugs(47, 1, HERE << filePath << ": initialization failure or " << + "premature close of rock db file"); + fatalf("Rock cache_dir failed to initialize db file: %s", filePath); + } + + if (theFile->error()) { + debugs(47, 1, HERE << filePath << ": " << xstrerror()); + fatalf("Rock cache_dir failed to open db file: %s", filePath); + } + + // TODO: lower debugging level + debugs(47,1, "Rock cache_dir[" << index << "] limits: " << + std::setw(12) << maxSize() << " disk bytes and " << + std::setw(7) << map->entryLimit() << " entries"); + + rebuild(); +} + +void +Rock::SwapDir::closeCompleted() +{ + theFile = NULL; +} + +void +Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r) +{ + ReadRequest *request = dynamic_cast(r.getRaw()); + assert(request); + IoState::Pointer sio = request->sio; + + if (errflag == DISK_OK && rlen > 0) + sio->offset_ += rlen; + assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum + + StoreIOState::STRCB *callback = sio->read.callback; + assert(callback); + sio->read.callback = NULL; + void *cbdata; + if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata)) + callback(cbdata, r->buf, rlen, sio.getRaw()); +} + +void +Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r) +{ + Rock::WriteRequest *request = dynamic_cast(r.getRaw()); + assert(request); + assert(request->sio != NULL); + IoState &sio = *request->sio; + + if (errflag == DISK_OK) { + // close, assuming we only write once; the entry gets the read lock + map->closeForWriting(sio.swap_filen, true); + // do not increment sio.offset_ because we do it in sio->write() + } else { + // Do not abortWriting here. The entry should keep the write lock + // instead of losing association with the store and confusing core. + map->free(sio.swap_filen); // will mark as unusable, just in case + } + + assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum + + sio.finishedWriting(errflag); +} + +bool +Rock::SwapDir::full() const +{ + return map && map->full(); +} + +// storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT, +// but it should not happen for us +void +Rock::SwapDir::diskFull() { + debugs(20,1, "Internal ERROR: No space left error with rock cache_dir: " << + filePath); +} + +/// purge while full(); it should be sufficient to purge just one +void +Rock::SwapDir::maintain() +{ + debugs(47,3, HERE << "cache_dir[" << index << "] guards: " << + !repl << !map << !full() << StoreController::store_dirs_rebuilding); + + if (!repl) + return; // no means (cannot find a victim) + + if (!map) + return; // no victims (yet) + + if (!full()) + return; // no need (to find a victim) + + // XXX: UFSSwapDir::maintain says we must quit during rebuild + if (StoreController::store_dirs_rebuilding) + return; + + debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() << + ' ' << currentSize() << " < " << diskOffsetLimit()); + + // Hopefully, we find a removable entry much sooner (TODO: use time?) + const int maxProbed = 10000; + RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed); + + // It really should not take that long, but this will stop "infinite" loops + const int maxFreed = 1000; + int freed = 0; + // TODO: should we purge more than needed to minimize overheads? + for (; freed < maxFreed && full(); ++freed) { + if (StoreEntry *e = walker->Next(walker)) + e->release(); // will call our unlink() method + else + break; // no more objects + } + + debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed << + " scanned " << walker->scanned << '/' << walker->locked); + + walker->Done(walker); + + if (full()) { + debugs(47,0, "ERROR: Rock cache_dir[" << index << "] " << + "is still full after freeing " << freed << " entries. A bug?"); + } +} + +void +Rock::SwapDir::reference(StoreEntry &e) +{ + debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen); + if (repl && repl->Referenced) + repl->Referenced(repl, &e, &e.repl); +} + +bool +Rock::SwapDir::dereference(StoreEntry &e) +{ + debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen); + if (repl && repl->Dereferenced) + repl->Dereferenced(repl, &e, &e.repl); + + // no need to keep e in the global store_table for us; we have our own map + return false; +} + +void +Rock::SwapDir::unlink(StoreEntry &e) +{ + debugs(47, 5, HERE << e); + ignoreReferences(e); + map->free(e.swap_filen); + disconnect(e); +} + +void +Rock::SwapDir::trackReferences(StoreEntry &e) +{ + debugs(47, 5, HERE << e); + if (repl) + repl->Add(repl, &e, &e.repl); +} + + +void +Rock::SwapDir::ignoreReferences(StoreEntry &e) +{ + debugs(47, 5, HERE << e); + if (repl) + repl->Remove(repl, &e, &e.repl); +} + +void +Rock::SwapDir::statfs(StoreEntry &e) const +{ + storeAppendPrintf(&e, "\n"); + storeAppendPrintf(&e, "Maximum Size: %"PRIu64" KB\n", maxSize() >> 10); + storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n", + currentSize() / 1024.0, + Math::doublePercent(currentSize(), maxSize())); + + if (map) { + const int limit = map->entryLimit(); + storeAppendPrintf(&e, "Maximum entries: %9d\n", limit); + if (limit > 0) { + const int entryCount = map->entryCount(); + storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n", + entryCount, (100.0 * entryCount / limit)); + + if (limit < 100) { // XXX: otherwise too expensive to count + Ipc::ReadWriteLockStats stats; + map->updateStats(stats); + stats.dump(e); + } + } + } + + storeAppendPrintf(&e, "Pending operations: %d out of %d\n", + store_open_disk_fd, Config.max_open_disk_fds); + + storeAppendPrintf(&e, "Flags:"); + + if (flags.selected) + storeAppendPrintf(&e, " SELECTED"); + + if (flags.read_only) + storeAppendPrintf(&e, " READ-ONLY"); + + storeAppendPrintf(&e, "\n"); + +} + + +/// initializes shared memory segments used by Rock::SwapDir +class RockSwapDirRr: public RegisteredRunner +{ +public: + /* RegisteredRunner API */ + virtual void run(const RunnerRegistry &); + virtual ~RockSwapDirRr(); + +private: + Vector owners; +}; + +RunnerRegistrationEntry(rrAfterConfig, RockSwapDirRr); + + +void RockSwapDirRr::run(const RunnerRegistry &) +{ + if (IamMasterProcess()) { + Must(owners.empty()); + for (int i = 0; i < Config.cacheSwap.n_configured; ++i) { + if (const Rock::SwapDir *const sd = dynamic_cast(INDEXSD(i))) { + Rock::SwapDir::DirMap::Owner *const owner = Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed()); + owners.push_back(owner); + } + } + } +} + +RockSwapDirRr::~RockSwapDirRr() +{ + for (size_t i = 0; i < owners.size(); ++i) + delete owners[i]; +} === added file 'src/fs/rock/RockSwapDir.h' --- src/fs/rock/RockSwapDir.h 1970-01-01 00:00:00 +0000 +++ src/fs/rock/RockSwapDir.h 2011-05-12 03:58:16 +0000 @@ -0,0 +1,90 @@ +#ifndef SQUID_FS_ROCK_SWAP_DIR_H +#define SQUID_FS_ROCK_SWAP_DIR_H + +#include "SwapDir.h" +#include "DiskIO/IORequestor.h" +#include "fs/rock/RockFile.h" +#include "ipc/StoreMap.h" + +class DiskIOStrategy; +class DiskFile; +class ReadRequest; +class WriteRequest; + +namespace Rock { + +class Rebuild; + +/// \ingroup Rock +class SwapDir: public ::SwapDir, public IORequestor +{ +public: + SwapDir(); + virtual ~SwapDir(); + + /* public ::SwapDir API */ + virtual void reconfigure(int, char *); + virtual StoreSearch *search(String const url, HttpRequest *); + virtual StoreEntry *get(const cache_key *key); + virtual void disconnect(StoreEntry &e); + virtual uint64_t currentSize() const; + virtual uint64_t currentCount() const; + virtual bool doReportStat() const; + virtual void swappedOut(const StoreEntry &e); + + int64_t entryLimitHigh() const { return 0xFFFFFF; } /// Core sfileno maximum + int64_t entryLimitAllowed() const; + + typedef Ipc::StoreMapWithExtras DirMap; + +protected: + /* protected ::SwapDir API */ + virtual bool needsDiskStrand() const; + virtual void create(); + virtual void init(); + virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const; + virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); + virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); + virtual void maintain(); + virtual void diskFull(); + virtual void reference(StoreEntry &e); + virtual bool dereference(StoreEntry &e); + virtual void unlink(StoreEntry &e); + virtual void statfs(StoreEntry &e) const; + + /* IORequestor API */ + virtual void ioCompletedNotification(); + virtual void closeCompleted(); + virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ::ReadRequest>); + virtual void writeCompleted(int errflag, size_t len, RefCount< ::WriteRequest>); + + virtual void parse(int index, char *path); + void parseSize(); ///< parses anonymous cache_dir size option + void validateOptions(); ///< warns of configuration problems; may quit + + void rebuild(); ///< starts loading and validating stored entry metadata + ///< used to add entries successfully loaded during rebuild + bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from); + + bool full() const; ///< no more entries can be stored without purging + void trackReferences(StoreEntry &e); ///< add to replacement policy scope + void ignoreReferences(StoreEntry &e); ///< delete from repl policy scope + + int64_t diskOffset(int filen) const; + int64_t diskOffsetLimit() const; + int entryLimit() const { return map->entryLimit(); } + + friend class Rebuild; + const char *filePath; ///< location of cache storage file inside path/ + +private: + DiskIOStrategy *io; + RefCount theFile; ///< cache storage for this cache_dir + DirMap *map; + + static const int64_t HeaderSize; ///< on-disk db header size +}; + +} // namespace Rock + +#endif /* SQUID_FS_ROCK_SWAP_DIR_H */