Changes revolving around making Store work with SMP-shared max-size cache_dirs: * Added MemObject::expectedReplySize() and used it instead of object_sz. When deciding whether an object with a known content length can be swapped out, do not wait until the object is completely received and its size (mem_obj->object_sz) becomes known (while asking the store to recheck in vain with every incoming chunk). Instead, use the known content length, if any, to make the decision. This optimizes the common case where the complete object is eventually received and swapped out, preventing accumulating potentially large objects in RAM while waiting for the end of the response. Should not affect objects with unknown content length. Side-effect1: probably fixes several cases of unknowingly using negative (unknown) mem_obj->object_sz in calculations. I added a few assertions to double check some of the remaining object_sz/objectLen() uses. Side-effect2: When expectedReplySize() is stored on disk as StoreEntry metadata, it may help to detect truncated entries when the writer process dies before completing the swapout. * Removed mem->swapout.memnode in favor of mem->swapout.queue_offset. The code used swapout.memnode pointer to keep track of the last page that was swapped out. The code was semi-buggy because it could reset the pointer to NULL if no new data came in before the call to doPages(). Perhaps the code relied on the assumption that the caller will never doPages if there is no new data, but I am not sure that assumption was correct in all cases (it could be that I broke the calling code, of course). Moreover, the page pointer was kept without any protection from page disappearing during asynchronous swapout. There were "Evil hack time" comments discussing how the page might disappear. Fortunately, we already have mem->swapout.queue_offset that can be fed to getBlockContainingLocation to find the page that needs to be swapped out. There is no need to keep the page pointer around. The queue_offset-based math is the same so we are not adding any overheads by using that offset (in fact, we are removing some minor computations). * Added "close how?" parameter to storeClose() and friends. The old code would follow the same path when closing swapout activity for an aborted entry and when completing a perfectly healthy swapout. In non-shared case, that could have been OK because the abort code would then release the entry, removing any half-written entry from the index and the disk (but I am not sure that release happened fast enough in 100% of cases). When the index and disk storage is shared among workers, such "temporary" inconsistencies result in truncated responses being delivered by other workers to the user because once the swapout activity is closed, other workers can start using the entry. By adding the "close how?" parameter to closing methods we allow the core and SwapDir-specific code to handle aborted swapouts appropriately. Since swapin code is "read only", we do not currently distinguish between aborted and fully satisfied readers: The readerGone enum value applies to both cases. If needed, the SwapDir reading code can make that distinction by analyzing how much was actually swapped in. * Moved "can you store this entry?" code to virtual SwapDir::canStore(). The old code had some of the tests in SwapDir-specific canStore() methods and some in storeDirSelect*() methods. This resulted in inconsistencies, code duplication, and extra calculation overheads. Making this call virtual allows individual cache_dir types to do custom access controls. The same method is used for cache_dir load reporting (if it returns true). Load management needs more work, but the current code is no worse than the old one in this aspect, and further improvements are outside this change scope. * Minimized from-disk StoreEntry loading/unpacking code duplication. Moved common (and often rather complex!) code from store modules into storeRebuildLoadEntry, storeRebuildParseEntry, and storeRebuildKeepEntry. * Do not set object_sz when the entry is aborted because the true object size (HTTP reply headers + body) is not known in this case. Setting object_sz may fool client-side code into believing that the object is complete. This addresses an old RBC's complaint. * When swapout initiation fails, release StoreEntry. This prevents the caller code from trying to swap out again and again because swap_status becomes SWAPOUT_NONE. TODO: Consider add SWAPOUT_ERROR, STORE_ERROR, and similar states. It may solve several problems where the code sees _NONE or _OK and thinks everything is peachy when in fact there was an error. * Always call StoreEntry::abort() instead of setting ENTRY_ABORTED manually. * Rely on entry->abort() side-effects if ENTRY_ABORTED was set. * Added or updated comments to better document current code. * Added operator << for dumping StoreEntry summary into the debugging log. Needs more work to report more info (and not report yet-unknown info). Based on 3p2-rock r11228. === modified file 'src/MemObject.cc' --- src/MemObject.cc 2010-12-13 11:31:14 +0000 +++ src/MemObject.cc 2011-02-14 04:10:48 +0000 @@ -223,40 +223,56 @@ StoreClientStats statsVisitor(mb); for_each(clients, statsVisitor); } int64_t MemObject::endOffset () const { return data_hdr.endOffset(); } int64_t MemObject::size() const { if (object_sz < 0) return endOffset(); return object_sz; } +int64_t +MemObject::expectedReplySize() const { + debugs(20, 7, HERE << "object_sz: " << object_sz); + if (object_sz >= 0) // complete() has been called; we know the exact answer + return object_sz; + + if (_reply) { + const int64_t clen = _reply->bodySize(method); + debugs(20, 7, HERE << "clen: " << clen); + if (clen >= 0 && _reply->hdr_sz > 0) // yuck: HttpMsg sets hdr_sz to 0 + return clen + _reply->hdr_sz; + } + + return -1; // not enough information to predict +} + void MemObject::reset() { assert(swapout.sio == NULL); data_hdr.freeContent(); inmem_lo = 0; /* Should we check for clients? */ } int64_t MemObject::lowestMemReaderOffset() const { LowestMemReader lowest (endOffset() + 1); for_each (clients, lowest); return lowest.current; } @@ -322,41 +338,41 @@ int64_t lowest_offset = lowestMemReaderOffset(); if (endOffset() < lowest_offset || endOffset() - inmem_lo > (int64_t)Config.Store.maxInMemObjSize || (swap && !Config.onoff.memory_cache_first)) return lowest_offset; return inmem_lo; } void MemObject::trimSwappable() { int64_t new_mem_lo = policyLowestOffsetToKeep(1); /* * We should only free up to what we know has been written * to disk, not what has been queued for writing. Otherwise * there will be a chunk of the data which is not in memory * and is not yet on disk. * The -1 makes sure the page isn't freed until storeSwapOut has - * walked to the next page. (mem->swapout.memnode) + * walked to the next page. */ int64_t on_disk; if ((on_disk = objectBytesOnDisk()) - 1 < new_mem_lo) new_mem_lo = on_disk - 1; if (new_mem_lo == -1) new_mem_lo = 0; /* the above might become -1 */ data_hdr.freeDataUpto(new_mem_lo); inmem_lo = new_mem_lo; } void MemObject::trimUnSwappable() { int64_t new_mem_lo = policyLowestOffsetToKeep(0); assert (new_mem_lo > 0); === modified file 'src/MemObject.h' --- src/MemObject.h 2010-11-27 06:44:33 +0000 +++ src/MemObject.h 2011-02-14 04:10:42 +0000 @@ -47,40 +47,43 @@ #include "DelayId.h" #endif class MemObject { public: static size_t inUseCount(); MEMPROXY_CLASS(MemObject); void dump() const; MemObject(char const *, char const *); ~MemObject(); void write(StoreIOBuffer, STMCB *, void *); void unlinkRequest(); HttpReply const *getReply() const; void replaceHttpReply(HttpReply *newrep); void stat (MemBuf * mb) const; int64_t endOffset () const; + /// negative if unknown; otherwise, expected object_sz, expected endOffset + /// maximum, and stored reply headers+body size (all three are the same) + int64_t expectedReplySize() const; int64_t size() const; void reset(); int64_t lowestMemReaderOffset() const; bool readAheadPolicyCanRead() const; void addClient(store_client *); /* XXX belongs in MemObject::swapout, once swaphdrsz is managed * better */ int64_t objectBytesOnDisk() const; int64_t policyLowestOffsetToKeep(bool swap) const; void trimSwappable(); void trimUnSwappable(); bool isContiguous() const; int mostBytesWanted(int max) const; void setNoDelay(bool const newValue); #if USE_DELAY_POOLS DelayId mostBytesAllowed() const; #endif @@ -89,42 +92,41 @@ void checkUrlChecksum() const; #endif HttpRequestMethod method; char *url; mem_hdr data_hdr; int64_t inmem_lo; dlink_list clients; /** \todo move into .cc or .cci */ size_t clientCount() const {return nclients;} bool clientIsFirst(void *sc) const {return (clients.head && sc == clients.head->data);} int nclients; class SwapOut { public: - int64_t queue_offset; /* relative to in-mem data */ - mem_node *memnode; /* which node we're currently paging out */ + int64_t queue_offset; ///< number of bytes sent to SwapDir for writing StoreIOState::Pointer sio; }; SwapOut swapout; /* Read only - this reply must be preserved by store clients */ /* The original reply. possibly with updated metadata. */ HttpRequest *request; struct timeval start_ping; IRCB *ping_reply_callback; void *ircb_data; struct { STABH *callback; void *data; } abort; char *log_url; RemovalPolicyNode repl; int id; === modified file 'src/Store.h' --- src/Store.h 2011-02-01 04:59:26 +0000 +++ src/Store.h 2011-02-13 23:04:18 +0000 @@ -95,41 +95,41 @@ virtual void complete(); virtual store_client_t storeClientType() const; virtual char const *getSerialisedMetaData(); virtual void replaceHttpReply(HttpReply *); virtual bool swapoutPossible(); virtual void trimMemory(); void abort(); void unlink(); void makePublic(); void makePrivate(); void setPublicKey(); void setPrivateKey(); void expireNow(); void releaseRequest(); void negativeCache(); void cacheNegatively(); /** \todo argh, why both? */ void invokeHandlers(); void purgeMem(); void swapOut(); bool swapOutAble() const; - void swapOutFileClose(); + void swapOutFileClose(int how); const char *url() const; int checkCachable(); int checkNegativeHit() const; int locked() const; int validToSend() const; int keepInMemory() const; void createMemObject(const char *, const char *); void dump(int debug_lvl) const; void hashDelete(); void hashInsert(const cache_key *); void registerAbort(STABH * cb, void *); void reset(); void setMemStatus(mem_status_t); void timestampsSet(); void unregisterAbort(); void destroyMemObject(); int checkTooSmall(); void delayAwareRead(int fd, char *buf, int len, AsyncCall::Pointer callback); === modified file 'src/StoreIOState.h' --- src/StoreIOState.h 2009-01-21 03:47:47 +0000 +++ src/StoreIOState.h 2011-02-13 22:58:40 +0000 @@ -67,48 +67,54 @@ * STIOCB is the "store close callback" for store files. It * is called when the store file is closed. STIOCB functions * are passed to storeCreate() and storeOpen(). Examples of * STIOCB callbacks are: * storeSwapOutFileClosed * storeSwapInFileClosed */ typedef void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self); /* StoreIOState does not get mempooled - it's children do */ void *operator new (size_t amount); void operator delete (void *address); virtual ~StoreIOState(); StoreIOState(); off_t offset() const; virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data) = 0; virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func) = 0; - virtual void close() = 0; + + typedef enum { + wroteAll, ///< success: caller supplied all data it wanted to swap out + writerGone, ///< failure: caller left before swapping out everything + readerDone ///< success or failure: either way, stop swapping in + } CloseHow; + virtual void close(int how) = 0; ///< finish or abort swapping per CloseHow sdirno swap_dirn; sfileno swap_filen; StoreEntry *e; /* Need this so the FS layers can play god */ mode_t mode; - off_t offset_; /* current on-disk offset pointer */ + off_t offset_; ///< number of bytes written or read for this entry so far STFNCB *file_callback; /* called on delayed sfileno assignments */ STIOCB *callback; void *callback_data; struct { STRCB *callback; void *callback_data; } read; struct { unsigned int closing:1; /* debugging aid */ } flags; }; StoreIOState::Pointer storeCreate(StoreEntry *, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); StoreIOState::Pointer storeOpen(StoreEntry *, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); -SQUIDCEXTERN void storeClose(StoreIOState::Pointer); +SQUIDCEXTERN void storeClose(StoreIOState::Pointer, int how); SQUIDCEXTERN void storeRead(StoreIOState::Pointer, char *, size_t, off_t, StoreIOState::STRCB *, void *); SQUIDCEXTERN void storeIOWrite(StoreIOState::Pointer, char const *, size_t, off_t, FREE *); #endif /* SQUID_STOREIOSTATE_H */ === modified file 'src/StoreMeta.h' --- src/StoreMeta.h 2011-01-27 21:14:56 +0000 +++ src/StoreMeta.h 2011-02-12 04:52:14 +0000 @@ -138,27 +138,25 @@ static bool validType(char); static int const MaximumTLVLength; static int const MinimumTLVLength; static StoreMeta *Factory(char type, size_t len, void const *value); static StoreMeta **Add(StoreMeta **tail, StoreMeta *aNode); static void FreeList(StoreMeta **head); virtual char getType() const = 0; virtual bool validLength(int) const; virtual bool checkConsistency(StoreEntry *) const; virtual ~StoreMeta() {} int length; void *value; tlv *next; }; /// \ingroup SwapStoreAPI SQUIDCEXTERN char *storeSwapMetaPack(tlv * tlv_list, int *length); /// \ingroup SwapStoreAPI -SQUIDCEXTERN size_t storeSwapMetaSize(const StoreEntry * e); -/// \ingroup SwapStoreAPI SQUIDCEXTERN tlv *storeSwapMetaBuild(StoreEntry * e); /// \ingroup SwapStoreAPI SQUIDCEXTERN void storeSwapTLVFree(tlv * n); #endif /* SQUID_TYPELENGTHVALUE_H */ === modified file 'src/SwapDir.cc' --- src/SwapDir.cc 2011-02-02 01:49:34 +0000 +++ src/SwapDir.cc 2011-02-11 18:32:53 +0000 @@ -92,40 +92,64 @@ SwapDir::maintain() {} uint64_t SwapDir::minSize() const { return ((maxSize() * Config.Swap.lowWaterMark) / 100); } void SwapDir::reference(StoreEntry &) {} void SwapDir::dereference(StoreEntry &) {} int SwapDir::callback() { return 0; } +bool +SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const +{ + debugs(47,8, HERE << "cache_dir[" << index << "]: needs " << + diskSpaceNeeded << " max_size) + return false; // already overflowing + + /* Return 999 (99.9%) constant load; TODO: add a named constant for this */ + load = 999; + return true; // kids may provide more tests and should report true load +} + + void SwapDir::sync() {} /* Move to StoreEntry ? */ bool SwapDir::canLog(StoreEntry const &e)const { if (e.swap_filen < 0) return false; if (e.swap_status != SWAPOUT_DONE) return false; if (e.swap_file_sz <= 0) return false; if (EBIT_TEST(e.flags, RELEASE_REQUEST)) return false; if (EBIT_TEST(e.flags, KEY_PRIVATE)) === modified file 'src/SwapDir.h' --- src/SwapDir.h 2011-02-04 22:18:41 +0000 +++ src/SwapDir.h 2011-02-11 18:21:24 +0000 @@ -159,42 +159,42 @@ uint64_t max_size; ///< maximum allocatable size of the storage area char *path; int index; /* This entry's index into the swapDirs array */ int64_t min_objsize; int64_t max_objsize; RemovalPolicy *repl; int removals; int scanned; struct Flags { Flags() : selected(0), read_only(0) {} unsigned int selected:1; unsigned int read_only:1; } flags; virtual void init() = 0; /* Initialise the fs */ virtual void create(); /* Create a new fs */ virtual void dump(StoreEntry &)const; /* Dump fs config snippet */ virtual bool doubleCheck(StoreEntry &); /* Double check the obj integrity */ virtual void statfs(StoreEntry &) const; /* Dump fs statistics */ virtual void maintain(); /* Replacement maintainence */ - /* <0 == error. > 1000 == error */ - virtual int canStore(StoreEntry const &)const = 0; /* Check if the fs will store an object */ + /// check whether we can store the entry; if we can, report current load + virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const = 0; /* These two are notifications */ virtual void reference(StoreEntry &); /* Reference this object */ virtual void dereference(StoreEntry &); /* Unreference this object */ virtual int callback(); /* Handle pending callbacks */ virtual void sync(); /* Sync the store prior to shutdown */ virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) = 0; virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) = 0; virtual void unlink (StoreEntry &); bool canLog(StoreEntry const &e)const; virtual void openLog(); virtual void closeLog(); virtual void logEntry(const StoreEntry & e, int op) const; class CleanLog { public: virtual ~CleanLog() {} virtual const StoreEntry *nextEntry() = 0; === modified file 'src/client_side_reply.cc' --- src/client_side_reply.cc 2010-12-13 11:31:14 +0000 +++ src/client_side_reply.cc 2011-02-14 02:43:01 +0000 @@ -1019,40 +1019,42 @@ // last-chunk was not sent return 0; } /* * Handle STORE_OK objects. * objectLen(entry) will be set proprely. * RC: Does objectLen(entry) include the Headers? * RC: Yes. */ if (entry->store_status == STORE_OK) { return storeOKTransferDone(); } else { return storeNotOKTransferDone(); } } int clientReplyContext::storeOKTransferDone() const { + assert(http->storeEntry()->objectLen() >= 0); + assert(http->storeEntry()->objectLen() >= headers_sz); if (http->out.offset >= http->storeEntry()->objectLen() - headers_sz) { debugs(88,3,HERE << "storeOKTransferDone " << " out.offset=" << http->out.offset << " objectLen()=" << http->storeEntry()->objectLen() << " headers_sz=" << headers_sz); return 1; } return 0; } int clientReplyContext::storeNotOKTransferDone() const { /* * Now, handle STORE_PENDING objects */ MemObject *mem = http->storeEntry()->mem_obj; assert(mem != NULL); assert(http->request != NULL); === modified file 'src/fs/coss/CossSwapDir.h' --- src/fs/coss/CossSwapDir.h 2008-11-11 10:38:40 +0000 +++ src/fs/coss/CossSwapDir.h 2011-02-11 17:55:01 +0000 @@ -20,41 +20,41 @@ /* Note that swap_filen in sio/e are actually disk offsets too! */ /* What we're doing in storeCossAllocate() */ #define COSS_ALLOC_ALLOCATE 1 #define COSS_ALLOC_REALLOC 2 /// \ingroup COSS class CossSwapDir : public SwapDir, public IORequestor { public: CossSwapDir(); virtual void init(); virtual void create(); virtual void dump(StoreEntry &)const; ~CossSwapDir(); virtual StoreSearch *search(String const url, HttpRequest *); virtual void unlink (StoreEntry &); virtual void statfs (StoreEntry &)const; - virtual int canStore(StoreEntry const &)const; + virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const; virtual int callback(); virtual void sync(); virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual void openLog(); virtual void closeLog(); virtual int writeCleanStart(); virtual void writeCleanDone(); virtual void logEntry(const StoreEntry & e, int op) const; virtual void parse (int index, char *path); virtual void reconfigure (int, char *); /* internals */ virtual off_t storeCossFilenoToDiskOffset(sfileno); virtual sfileno storeCossDiskOffsetToFileno(off_t); virtual CossMemBuf *storeCossFilenoToMembuf(sfileno f); /* IORequestor routines */ virtual void ioCompletedNotification(); virtual void closeCompleted(); virtual void readCompleted(const char *buf, int len, int errflag, RefCount); virtual void writeCompleted(int errflag, size_t len, RefCount); === modified file 'src/fs/coss/store_dir_coss.cc' --- src/fs/coss/store_dir_coss.cc 2010-12-14 01:12:24 +0000 +++ src/fs/coss/store_dir_coss.cc 2011-02-11 17:54:01 +0000 @@ -943,60 +943,48 @@ /* we are shutting down, flush all membufs to disk */ CossSwapDir::~CossSwapDir() { io->sync(); if (theFile != NULL) theFile->close(); delete io; closeLog(); n_coss_dirs--; safe_free(ioModule); safe_free(stripe_path); } -/* - * storeCossDirCheckObj - * - * This routine is called by storeDirSelectSwapDir to see if the given - * object is able to be stored on this filesystem. COSS filesystems will - * not store everything. We don't check for maxobjsize here since its - * done by the upper layers. - */ -int -CossSwapDir::canStore(StoreEntry const &e)const +bool +CossSwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const { + if (!SwapDir::canStore(e, diskSpaceNeeded, load)) + return false; - /* Check if the object is a special object, we can't cache these */ - - if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) - return -1; - - /* Otherwise, we're ok */ - /* Return load, cs->aq.aq_numpending out of MAX_ASYNCOP */ - return io->load(); + load = io->load(); + return true; } /* * storeCossDirCallback - do the IO completions */ int CossSwapDir::callback() { return io->callback(); } /* ========== LOCAL FUNCTIONS ABOVE, GLOBAL FUNCTIONS BELOW ========== */ void CossSwapDir::statfs(StoreEntry & sentry) const { storeAppendPrintf(&sentry, "\n"); storeAppendPrintf(&sentry, "Maximum Size: %lu KB\n", max_size); storeAppendPrintf(&sentry, "Current Size: %lu KB\n", cur_size); storeAppendPrintf(&sentry, "Percent Used: %0.2f%%\n", === modified file 'src/fs/ufs/store_dir_ufs.cc' --- src/fs/ufs/store_dir_ufs.cc 2010-12-13 11:31:14 +0000 +++ src/fs/ufs/store_dir_ufs.cc 2011-02-11 18:03:26 +0000 @@ -40,47 +40,51 @@ #include "StoreSwapLogData.h" #include "ConfigOption.h" #include "DiskIO/DiskIOStrategy.h" #include "DiskIO/DiskIOModule.h" #include "Parsing.h" #include "SquidMath.h" #include "SquidTime.h" #include "SwapDir.h" #include "swap_log_op.h" int UFSSwapDir::NumberOfUFSDirs = 0; int *UFSSwapDir::UFSDirToGlobalDirMapping = NULL; /* * storeUfsDirCheckObj * * This routine is called by storeDirSelectSwapDir to see if the given * object is able to be stored on this filesystem. UFS filesystems will * happily store anything as long as the LRU time isn't too small. */ -int -UFSSwapDir::canStore(StoreEntry const &e)const +bool +UFSSwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const { + if (!SwapDir::canStore(e, diskSpaceNeeded, load)) + return false; + if (IO->shedLoad()) - return -1; + return false; - return IO->load(); + load = IO->load(); + return true; } /* ========== LOCAL FUNCTIONS ABOVE, GLOBAL FUNCTIONS BELOW ========== */ void UFSSwapDir::parseSizeL1L2() { int i = GetInteger(); if (i <= 0) fatal("UFSSwapDir::parseSizeL1L2: invalid size value"); size_t size = i << 10; /* Mbytes to kbytes */ /* just reconfigure it */ if (reconfiguring) { if (size == max_size) debugs(3, 2, "Cache dir '" << path << "' size remains unchanged at " << size << " KB"); else debugs(3, 1, "Cache dir '" << path << "' size changed to " << size << " KB"); === modified file 'src/fs/ufs/store_io_ufs.cc' --- src/fs/ufs/store_io_ufs.cc 2010-08-29 00:12:52 +0000 +++ src/fs/ufs/store_io_ufs.cc 2011-02-13 23:09:58 +0000 @@ -173,45 +173,45 @@ if (theFile->error()) { debugs(79,3,HERE<< "theFile->error() ret " << theFile->error()); doCloseCallback(DISK_ERROR); } else { doCloseCallback(DISK_OK); } closing = false; } /* * DPW 2006-05-24 * This close function is called by the higher layer when it has finished * reading/writing everything, or otherwise wants to close the swap * file. In the case of writing and using aufs storage, close() might * be called before any/all data is written, and even before the open * callback occurs. Thus, we use our tryClosing() method, which knows * when it is safe to actually signal the lower layer for closing. */ void -UFSStoreState::close() +UFSStoreState::close(int) { debugs(79, 3, "UFSStoreState::close: dirno " << swap_dirn << ", fileno "<< std::setfill('0') << std::hex << std::uppercase << std::setw(8) << swap_filen); - tryClosing(); + tryClosing(); // UFS does not distinguish different closure types } void UFSStoreState::read_(char *buf, size_t size, off_t aOffset, STRCB * aCallback, void *aCallbackData) { assert(read.callback == NULL); assert(read.callback_data == NULL); assert(!reading); assert(!closing); assert (aCallback); if (!theFile->canRead()) { debugs(79, 3, "UFSStoreState::read_: queueing read because theFile can't read"); queueRead (buf, size, aOffset, aCallback, aCallbackData); return; } read.callback = aCallback; read.callback_data = cbdataReference(aCallbackData); debugs(79, 3, "UFSStoreState::read_: dirno " << swap_dirn << ", fileno "<< === modified file 'src/fs/ufs/ufscommon.h' --- src/fs/ufs/ufscommon.h 2010-07-06 12:44:06 +0000 +++ src/fs/ufs/ufscommon.h 2011-02-13 23:08:00 +0000 @@ -45,41 +45,41 @@ /// \ingroup UFS class UFSSwapDir : public SwapDir { public: static int IsUFSDir(SwapDir* sd); static int DirClean(int swap_index); static int FilenoBelongsHere(int fn, int F0, int F1, int F2); UFSSwapDir(char const *aType, const char *aModuleType); virtual void init(); virtual void create(); virtual void dump(StoreEntry &) const; ~UFSSwapDir(); virtual StoreSearch *search(String const url, HttpRequest *); virtual bool doubleCheck(StoreEntry &); virtual void unlink(StoreEntry &); virtual void statfs(StoreEntry &)const; virtual void maintain(); - virtual int canStore(StoreEntry const &)const; + virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const; virtual void reference(StoreEntry &); virtual void dereference(StoreEntry &); virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual void openLog(); virtual void closeLog(); virtual int writeCleanStart(); virtual void writeCleanDone(); virtual void logEntry(const StoreEntry & e, int op) const; virtual void parse(int index, char *path); virtual void reconfigure(int, char *); virtual int callback(); virtual void sync(); void unlinkFile(sfileno f); // move down when unlink is a virtual method //protected: UFSStrategy *IO; char *fullPath(sfileno, char *) const; /* temp */ @@ -189,41 +189,41 @@ /** The io strategy in use */ DiskIOStrategy *io; protected: friend class UFSSwapDir; }; /** Common ufs-store-dir logic */ class ReadRequest; /// \ingroup UFS class UFSStoreState : public StoreIOState, public IORequestor { public: void * operator new (size_t); void operator delete (void *); UFSStoreState(SwapDir * SD, StoreEntry * anEntry, STIOCB * callback_, void *callback_data_); ~UFSStoreState(); - virtual void close(); + virtual void close(int how); virtual void closeCompleted(); // protected: virtual void ioCompletedNotification(); virtual void readCompleted(const char *buf, int len, int errflag, RefCount); virtual void writeCompleted(int errflag, size_t len, RefCount); RefCount theFile; bool opening; bool creating; bool closing; bool reading; bool writing; void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data); void write(char const *buf, size_t size, off_t offset, FREE * free_func); protected: virtual void doCloseCallback (int errflag); class _queued_read { === modified file 'src/store.cc' --- src/store.cc 2011-02-08 04:04:57 +0000 +++ src/store.cc 2011-02-15 01:47:43 +0000 @@ -29,40 +29,41 @@ * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "event.h" #include "fde.h" #include "Store.h" #include "mgr/Registration.h" #include "StoreClient.h" #include "stmem.h" #include "HttpReply.h" #include "HttpRequest.h" #include "MemObject.h" #include "mem_node.h" #include "StoreMeta.h" #include "SwapDir.h" +#include "StoreIOState.h" #if USE_DELAY_POOLS #include "DelayPools.h" #endif #include "Stack.h" #include "SquidTime.h" #include "swap_log_op.h" #include "mgr/StoreIoAction.h" static STMCB storeWriteComplete; #define REBUILD_TIMESTAMP_DELTA_MAX 2 #define STORE_IN_MEM_BUCKETS (229) /** \todo Convert these string constants to enum string-arrays generated */ const char *memStatusStr[] = { "NOT_IN_MEMORY", "IN_MEMORY" @@ -1096,75 +1097,65 @@ */ void StoreEntry::abort() { statCounter.aborted_requests++; assert(store_status == STORE_PENDING); assert(mem_obj != NULL); debugs(20, 6, "storeAbort: " << getMD5Text()); lock(); /* lock while aborting */ negativeCache(); releaseRequest(); EBIT_SET(flags, ENTRY_ABORTED); setMemStatus(NOT_IN_MEMORY); store_status = STORE_OK; - /* - * We assign an object length here. The only other place we assign - * the object length is in storeComplete() - */ - /* RBC: What do we need an object length for? we've just aborted the - * request, the request is private and negatively cached. Surely - * the object length is inappropriate to set. - */ - mem_obj->object_sz = mem_obj->endOffset(); - /* Notify the server side */ /* * DPW 2007-05-07 * Should we check abort.data for validity? */ if (mem_obj->abort.callback) { if (!cbdataReferenceValid(mem_obj->abort.data)) debugs(20,1,HERE << "queueing event when abort.data is not valid"); eventAdd("mem_obj->abort.callback", mem_obj->abort.callback, mem_obj->abort.data, 0.0, true); unregisterAbort(); } /* XXX Should we reverse these two, so that there is no * unneeded disk swapping triggered? */ /* Notify the client side */ invokeHandlers(); - /* Close any swapout file */ - swapOutFileClose(); + // abort swap out, invalidating what was created so far (release follows) + swapOutFileClose(StoreIOState::writerGone); unlock(); /* unlock */ } /** * Clear Memory storage to accommodate the given object len */ void storeGetMemSpace(int size) { PROF_start(storeGetMemSpace); StoreEntry *e = NULL; int released = 0; static time_t last_check = 0; size_t pages_needed; RemovalPurgeWalker *walker; if (squid_curtime == last_check) { PROF_stop(storeGetMemSpace); return; @@ -1821,101 +1812,103 @@ /* TODO: when we store headers serparately remove the header portion */ /* TODO: mark the length of the headers ? */ /* We ONLY want the headers */ packerToStoreInit(&p, this); assert (isEmpty()); getReply()->packHeadersInto(&p); rep->hdr_sz = mem_obj->endOffset(); httpBodyPackInto(&getReply()->body, &p); packerClean(&p); } char const * StoreEntry::getSerialisedMetaData() { - const size_t swap_hdr_sz0 = storeSwapMetaSize(this); - assert (swap_hdr_sz0 >= 0); - mem_obj->swap_hdr_sz = (size_t) swap_hdr_sz0; - // now we can use swap_hdr_sz to calculate swap_file_sz - // so that storeSwapMetaBuild/Pack can pack corrent swap_file_sz - swap_file_sz = objectLen() + mem_obj->swap_hdr_sz; - StoreMeta *tlv_list = storeSwapMetaBuild(this); int swap_hdr_sz; char *result = storeSwapMetaPack(tlv_list, &swap_hdr_sz); - assert(static_cast(swap_hdr_sz0) == swap_hdr_sz); storeSwapTLVFree(tlv_list); + assert (swap_hdr_sz >= 0); + mem_obj->swap_hdr_sz = (size_t) swap_hdr_sz; return result; } -/* - * Calculate TLV list size for a StoreEntry - * XXX: Must match the actual storeSwapMetaBuild result size - */ -size_t -storeSwapMetaSize(const StoreEntry * e) -{ - size_t size = 0; - ++size; // STORE_META_OK - size += sizeof(int); // size of header to follow - - const size_t pfx = sizeof(char) + sizeof(int); // in the start of list entries - - size += pfx + SQUID_MD5_DIGEST_LENGTH; - size += pfx + STORE_HDR_METASIZE; - size += pfx + strlen(e->url()) + 1; - - // STORE_META_OBJSIZE - if (e->objectLen() >= 0) - size += pfx + sizeof(int64_t); - - if (const char *vary = e->mem_obj->vary_headers) - size += pfx + strlen(vary) + 1; - - debugs(20, 3, "storeSwapMetaSize(" << e->url() << "): " << size); - return size; -} - bool StoreEntry::swapoutPossible() { /* should we swap something out to disk? */ debugs(20, 7, "storeSwapOut: " << url()); debugs(20, 7, "storeSwapOut: store_status = " << storeStatusStr[store_status]); if (EBIT_TEST(flags, ENTRY_ABORTED)) { assert(EBIT_TEST(flags, RELEASE_REQUEST)); - swapOutFileClose(); + // StoreEntry::abort() already closed the swap out file, if any return false; } + // if we decided that swapout is possible, do not repeat same checks + // TODO: do not repeat any checks if we decided that swapout is impossible + if (swap_status != SWAPOUT_NONE) { + debugs(20, 3, "storeSwapOut: already started"); + return true; + } + if (EBIT_TEST(flags, ENTRY_SPECIAL)) { debugs(20, 3, "storeSwapOut: " << url() << " SPECIAL"); return false; } + // check cache_dir max-size limit if all cache_dirs have it + if (store_maxobjsize >= 0) { + assert(mem_obj); + + // TODO: add estimated store metadata size to be conservative + + // use guaranteed maximum if it is known + const int64_t expectedEnd = mem_obj->expectedReplySize(); + debugs(20, 7, "storeSwapOut: expectedEnd = " << expectedEnd); + if (expectedEnd > store_maxobjsize) { + debugs(20, 3, "storeSwapOut: will not fit: " << expectedEnd << + " > " << store_maxobjsize); + return false; // known to outgrow the limit eventually + } + if (expectedEnd < 0) { + debugs(20, 3, "storeSwapOut: wait for more info: " << + store_maxobjsize); + return false; // may fit later, but will be rejected now + } + + // use current minimum (always known) + const int64_t currentEnd = mem_obj->endOffset(); + if (currentEnd > store_maxobjsize) { + debugs(20, 3, "storeSwapOut: does not fit: " << currentEnd << + " > " << store_maxobjsize); + return false; // already does not fit and may only get bigger + } + } + return true; } void StoreEntry::trimMemory() { /* * DPW 2007-05-09 * Bug #1943. We must not let go any data for IN_MEMORY * objects. We have to wait until the mem_status changes. */ if (mem_status == IN_MEMORY) return; if (!swapOutAble()) { if (mem_obj->policyLowestOffsetToKeep(0) == 0) { /* Nothing to do */ return; } /* === modified file 'src/store_client.cc' --- src/store_client.cc 2010-12-13 11:31:14 +0000 +++ src/store_client.cc 2011-02-14 00:48:32 +0000 @@ -574,43 +574,48 @@ if (tlv_list == NULL) { debugs(90, 1, "WARNING: failed to unpack meta data"); fail(); return; } /* * Check the meta data and make sure we got the right object. */ for (tlv *t = tlv_list; t; t = t->next) { if (!t->checkConsistency(entry)) { storeSwapTLVFree(tlv_list); fail(); return; } } storeSwapTLVFree(tlv_list); + assert(swap_hdr_sz >= 0); + assert(entry->swap_file_sz > 0); + assert(entry->swap_file_sz >= static_cast(swap_hdr_sz)); entry->mem_obj->swap_hdr_sz = swap_hdr_sz; entry->mem_obj->object_sz = entry->swap_file_sz - swap_hdr_sz; - + debugs(90, 5, "store_client::unpackHeader: swap_file_sz=" << + entry->swap_file_sz << "( " << swap_hdr_sz << " + " << + entry->mem_obj->object_sz << ")"); } void store_client::readHeader(char const *buf, ssize_t len) { MemObject *const mem = entry->mem_obj; assert(flags.disk_io_pending); flags.disk_io_pending = 0; assert(_callback.pending()); unpackHeader (buf, len); if (!object_ok) return; /* * If our last read got some data the client wants, then give * it to them, otherwise schedule another read. */ @@ -679,41 +684,41 @@ debugs(90, 3, "storeUnregister: called for '" << e->getMD5Text() << "'"); if (sc == NULL) { debugs(90, 3, "storeUnregister: No matching client for '" << e->getMD5Text() << "'"); return 0; } if (mem->clientCount() == 0) { debugs(90, 3, "storeUnregister: Consistency failure - store client being unregistered is not in the mem object's list for '" << e->getMD5Text() << "'"); return 0; } dlinkDelete(&sc->node, &mem->clients); mem->nclients--; if (e->store_status == STORE_OK && e->swap_status != SWAPOUT_DONE) e->swapOut(); if (sc->swapin_sio != NULL) { - storeClose(sc->swapin_sio); + storeClose(sc->swapin_sio, StoreIOState::readerDone); sc->swapin_sio = NULL; statCounter.swap.ins++; } if (sc->_callback.pending()) { /* callback with ssize = -1 to indicate unexpected termination */ debugs(90, 3, "storeUnregister: store_client for " << mem->url << " has a callback"); sc->fail(); } #if STORE_CLIENT_LIST_DEBUG cbdataReferenceDone(sc->owner); #endif delete sc; assert(e->lock_count > 0); if (mem->nclients == 0) === modified file 'src/store_dir.cc' --- src/store_dir.cc 2011-02-02 01:49:34 +0000 +++ src/store_dir.cc 2011-02-15 03:45:03 +0000 @@ -169,119 +169,102 @@ return false; // Else, make sure that the object size will fit. return min_objsize <= objsize && max_objsize > objsize; } /* * This new selection scheme simply does round-robin on all SwapDirs. * A SwapDir is skipped if it is over the max_size (100%) limit, or * overloaded. */ static int storeDirSelectSwapDirRoundRobin(const StoreEntry * e) { static int dirn = 0; int i; int load; RefCount sd; - ssize_t objsize = e->objectLen(); + // e->objectLen() is negative at this point when we are still STORE_PENDING + ssize_t objsize = e->mem_obj->expectedReplySize(); if (objsize != -1) objsize += e->mem_obj->swap_hdr_sz; for (i = 0; i <= Config.cacheSwap.n_configured; i++) { if (++dirn >= Config.cacheSwap.n_configured) dirn = 0; sd = dynamic_cast(INDEXSD(dirn)); - if (sd->flags.read_only) + if (!sd->canStore(*e, objsize, load)) continue; - if (sd->cur_size > sd->max_size) - continue; - - if (!sd->objectSizeIsAcceptable(objsize)) - continue; - - /* check for error or overload condition */ - load = sd->canStore(*e); - if (load < 0 || load > 1000) { continue; } return dirn; } return -1; } /* * Spread load across all of the store directories * * Note: We should modify this later on to prefer sticking objects * in the *tightest fit* swapdir to conserve space, along with the * actual swapdir usage. But for now, this hack will do while * testing, so you should order your swapdirs in the config file * from smallest maxobjsize to unlimited (-1) maxobjsize. * * We also have to choose nleast == nconf since we need to consider * ALL swapdirs, regardless of state. Again, this is a hack while * we sort out the real usefulness of this algorithm. */ static int storeDirSelectSwapDirLeastLoad(const StoreEntry * e) { - ssize_t objsize; ssize_t most_free = 0, cur_free; ssize_t least_objsize = -1; int least_load = INT_MAX; int load; int dirn = -1; int i; RefCount SD; - /* Calculate the object size */ - objsize = e->objectLen(); + // e->objectLen() is negative at this point when we are still STORE_PENDING + ssize_t objsize = e->mem_obj->expectedReplySize(); if (objsize != -1) objsize += e->mem_obj->swap_hdr_sz; for (i = 0; i < Config.cacheSwap.n_configured; i++) { SD = dynamic_cast(INDEXSD(i)); SD->flags.selected = 0; - load = SD->canStore(*e); - - if (load < 0 || load > 1000) { - continue; - } - - if (!SD->objectSizeIsAcceptable(objsize)) - continue; - if (SD->flags.read_only) + if (!SD->canStore(*e, objsize, load)) continue; - if (SD->cur_size > SD->max_size) + if (load < 0 || load > 1000) continue; if (load > least_load) continue; cur_free = SD->max_size - SD->cur_size; /* If the load is equal, then look in more details */ if (load == least_load) { /* closest max_objsize fit */ if (least_objsize != -1) if (SD->max_objsize > least_objsize || SD->max_objsize == -1) continue; /* most free */ if (cur_free < most_free) continue; } === modified file 'src/store_io.cc' --- src/store_io.cc 2010-10-28 18:52:59 +0000 +++ src/store_io.cc 2011-02-13 22:51:03 +0000 @@ -1,89 +1,81 @@ #include "squid.h" #include "Store.h" #include "MemObject.h" #include "SwapDir.h" StoreIoStats store_io_stats; /* * submit a request to create a cache object for writing. * The StoreEntry structure is sent as a hint to the filesystem * to what will be stored in this object, to allow the filesystem * to select different polices depending on object size or type. */ StoreIOState::Pointer storeCreate(StoreEntry * e, StoreIOState::STFNCB * file_callback, StoreIOState::STIOCB * close_callback, void *callback_data) { assert (e); - ssize_t objsize; - sdirno dirn; - RefCount SD; store_io_stats.create.calls++; - /* This is just done for logging purposes */ - objsize = e->objectLen(); - - if (objsize != -1) - objsize += e->mem_obj->swap_hdr_sz; /* * Pick the swapdir * We assume that the header has been packed by now .. */ - dirn = storeDirSelectSwapDir(e); + const sdirno dirn = storeDirSelectSwapDir(e); if (dirn == -1) { - debugs(20, 2, "storeCreate: no valid swapdirs for this object"); + debugs(20, 2, "storeCreate: no swapdirs for " << *e); store_io_stats.create.select_fail++; return NULL; } - debugs(20, 2, "storeCreate: Selected dir '" << dirn << "' for obj size '" << objsize << "'"); - SD = dynamic_cast(INDEXSD(dirn)); + debugs(20, 2, "storeCreate: Selected dir " << dirn << " for " << *e); + SwapDir *SD = dynamic_cast(INDEXSD(dirn)); /* Now that we have a fs to use, call its storeCreate function */ StoreIOState::Pointer sio = SD->createStoreIO(*e, file_callback, close_callback, callback_data); if (sio == NULL) store_io_stats.create.create_fail++; else store_io_stats.create.success++; return sio; } /* * storeOpen() is purely for reading .. */ StoreIOState::Pointer storeOpen(StoreEntry * e, StoreIOState::STFNCB * file_callback, StoreIOState::STIOCB * callback, void *callback_data) { return dynamic_cast(e->store().getRaw())->openStoreIO(*e, file_callback, callback, callback_data); } void -storeClose(StoreIOState::Pointer sio) +storeClose(StoreIOState::Pointer sio, int how) { if (sio->flags.closing) { debugs(20,3,HERE << "storeClose: flags.closing already set, bailing"); return; } sio->flags.closing = 1; - debugs(20,3,HERE << "storeClose: calling sio->close()"); - sio->close(); + debugs(20,3,HERE << "storeClose: calling sio->close(" << how << ")"); + sio->close(how); } void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB * callback, void *callback_data) { sio->read_(buf, size, offset, callback, callback_data); } void storeIOWrite(StoreIOState::Pointer sio, char const *buf, size_t size, off_t offset, FREE * free_func) { sio->write(buf,size,offset,free_func); } === modified file 'src/store_swapmeta.cc' --- src/store_swapmeta.cc 2010-12-13 11:31:14 +0000 +++ src/store_swapmeta.cc 2011-02-14 05:08:07 +0000 @@ -44,42 +44,42 @@ { tlv *t; while ((t = n) != NULL) { n = t->next; xfree(t->value); delete t; } } /* * Build a TLV list for a StoreEntry */ tlv * storeSwapMetaBuild(StoreEntry * e) { tlv *TLV = NULL; /* we'll return this */ tlv **T = &TLV; const char *url; const char *vary; - const int64_t objsize = e->objectLen(); assert(e->mem_obj != NULL); + const int64_t objsize = e->mem_obj->expectedReplySize(); assert(e->swap_status == SWAPOUT_WRITING); url = e->url(); debugs(20, 3, "storeSwapMetaBuild: " << url ); tlv *t = StoreMeta::Factory (STORE_META_KEY,SQUID_MD5_DIGEST_LENGTH, e->key); if (!t) { storeSwapTLVFree(TLV); return NULL; } T = StoreMeta::Add(T, t); t = StoreMeta::Factory(STORE_META_STD_LFS,STORE_HDR_METASIZE,&e->timestamp); if (!t) { storeSwapTLVFree(TLV); return NULL; } T = StoreMeta::Add(T, t); t = StoreMeta::Factory(STORE_META_URL, strlen(url) + 1, url); === modified file 'src/store_swapout.cc' --- src/store_swapout.cc 2011-02-07 01:36:27 +0000 +++ src/store_swapout.cc 2011-02-15 01:34:56 +0000 @@ -68,40 +68,43 @@ std::uppercase << e->swap_filen); e->swap_status = SWAPOUT_WRITING; /* If we start swapping out objects with OutOfBand Metadata, * then this code needs changing */ /* TODO: make some sort of data,size refcounted immutable buffer * and stop fooling ourselves with "const char*" buffers. */ // Create metadata now, possibly in vain: storeCreate needs swap_hdr_sz. const char *buf = e->getSerialisedMetaData (); assert(buf); /* Create the swap file */ generic_cbdata *c = new generic_cbdata(e); sio = storeCreate(e, storeSwapOutFileNotify, storeSwapOutFileClosed, c); if (sio == NULL) { e->swap_status = SWAPOUT_NONE; + // our caller thinks SWAPOUT_NONE means swapping out has not started + // yet so we better release here to avoid being called again and again + e->releaseRequest(); delete c; xfree((char*)buf); storeLog(STORE_LOG_SWAPOUTFAIL, e); return; } mem->swapout.sio = sio; /* Don't lock until after create, or the replacement * code might get confused */ e->lock(); /* Pick up the file number if it was assigned immediately */ e->swap_filen = mem->swapout.sio->swap_filen; e->swap_dirn = mem->swapout.sio->swap_dirn; /* write out the swap metadata */ storeIOWrite(mem->swapout.sio, buf, mem->swap_hdr_sz, 0, xfree_cppwrapper); } @@ -109,166 +112,170 @@ storeSwapOutFileNotify(void *data, int errflag, StoreIOState::Pointer self) { generic_cbdata *c = (generic_cbdata *)data; StoreEntry *e = (StoreEntry *)c->data; MemObject *mem = e->mem_obj; assert(e->swap_status == SWAPOUT_WRITING); assert(mem); assert(mem->swapout.sio == self); assert(errflag == 0); assert(e->swap_filen < 0); // if this fails, call SwapDir::disconnect(e) e->swap_filen = mem->swapout.sio->swap_filen; e->swap_dirn = mem->swapout.sio->swap_dirn; } static void doPages(StoreEntry *anEntry) { MemObject *mem = anEntry->mem_obj; do { - /* - * Evil hack time. - * We are paging out to disk in page size chunks. however, later on when - * we update the queue position, we might not have a page (I *think*), - * so we do the actual page update here. - */ - - if (mem->swapout.memnode == NULL) { - /* We need to swap out the first page */ - mem->swapout.memnode = const_cast(mem->data_hdr.start()); - } else { - /* We need to swap out the next page */ - /* 20030636 RBC - we don't have ->next anymore. - * But we do have the next location */ - mem->swapout.memnode = mem->data_hdr.getBlockContainingLocation (mem->swapout.memnode->end()); - } + // find the page containing the first byte we have not swapped out yet + mem_node *page = + mem->data_hdr.getBlockContainingLocation(mem->swapout.queue_offset); + + if (!page) + return; // wait for more data to become available + + // memNodeWriteComplete() and absence of buffer offset math below + // imply that we always write from the very beginning of the page + assert(page->start() == mem->swapout.queue_offset); /* * Get the length of this buffer. We are assuming(!) that the buffer * length won't change on this buffer, or things are going to be very * strange. I think that after the copy to a buffer is done, the buffer * size should stay fixed regardless so that this code isn't confused, * but we can look at this at a later date or whenever the code results * in bad swapouts, whichever happens first. :-) */ - ssize_t swap_buf_len = mem->swapout.memnode->nodeBuffer.length; + ssize_t swap_buf_len = page->nodeBuffer.length; debugs(20, 3, "storeSwapOut: swap_buf_len = " << swap_buf_len); assert(swap_buf_len > 0); debugs(20, 3, "storeSwapOut: swapping out " << swap_buf_len << " bytes from " << mem->swapout.queue_offset); mem->swapout.queue_offset += swap_buf_len; storeIOWrite(mem->swapout.sio, - mem->data_hdr.NodeGet(mem->swapout.memnode), + mem->data_hdr.NodeGet(page), swap_buf_len, -1, memNodeWriteComplete); /* the storeWrite() call might generate an error */ if (anEntry->swap_status != SWAPOUT_WRITING) break; int64_t swapout_size = mem->endOffset() - mem->swapout.queue_offset; if (anEntry->store_status == STORE_PENDING) if (swapout_size < SM_PAGE_SIZE) break; if (swapout_size <= 0) return; } while (true); } /* This routine is called every time data is sent to the client side. * It's overhead is therefor, significant. */ void StoreEntry::swapOut() { if (!mem_obj) return; if (!swapoutPossible()) return; + // Aborted entries have STORE_OK, but swapoutPossible rejects them. Thus, + // store_status == STORE_OK below means we got everything we wanted. + debugs(20, 7, HERE << "storeSwapOut: mem->inmem_lo = " << mem_obj->inmem_lo); debugs(20, 7, HERE << "storeSwapOut: mem->endOffset() = " << mem_obj->endOffset()); debugs(20, 7, HERE << "storeSwapOut: swapout.queue_offset = " << mem_obj->swapout.queue_offset); if (mem_obj->swapout.sio != NULL) debugs(20, 7, "storeSwapOut: storeOffset() = " << mem_obj->swapout.sio->offset() ); + // buffered bytes we have not swapped out yet int64_t swapout_maxsize = mem_obj->endOffset() - mem_obj->swapout.queue_offset; assert(swapout_maxsize >= 0); int64_t const lowest_offset = mem_obj->lowestMemReaderOffset(); debugs(20, 7, HERE << "storeSwapOut: lowest_offset = " << lowest_offset); - /* - * Grab the swapout_size and check to see whether we're going to defer - * the swapout based upon size - */ - if ((store_status != STORE_OK) && (swapout_maxsize < store_maxobjsize)) { - /* - * NOTE: the store_maxobjsize here is the max of optional - * max-size values from 'cache_dir' lines. It is not the - * same as 'maximum_object_size'. By default, store_maxobjsize - * will be set to -1. However, I am worried that this - * deferance may consume a lot of memory in some cases. - * It would be good to make this decision based on reply - * content-length, rather than wait to accumulate huge - * amounts of object data in memory. - */ - debugs(20, 5, "storeSwapOut: Deferring starting swapping out"); - return; + // Check to see whether we're going to defer the swapout based upon size + if (store_status != STORE_OK) { + const int64_t expectedSize = mem_obj->expectedReplySize(); + const int64_t maxKnownSize = expectedSize < 0 ? + swapout_maxsize : expectedSize; + debugs(20, 7, HERE << "storeSwapOut: maxKnownSize= " << maxKnownSize); + + if (maxKnownSize < store_maxobjsize) { + /* + * NOTE: the store_maxobjsize here is the max of optional + * max-size values from 'cache_dir' lines. It is not the + * same as 'maximum_object_size'. By default, store_maxobjsize + * will be set to -1. However, I am worried that this + * deferance may consume a lot of memory in some cases. + * Should we add an option to limit this memory consumption? + */ + debugs(20, 5, "storeSwapOut: Deferring swapout start for " << + (store_maxobjsize - maxKnownSize) << " bytes"); + return; + } } +// TODO: it is better to trim as soon as we swap something out, not before trimMemory(); #if SIZEOF_OFF_T <= 4 if (mem_obj->endOffset() > 0x7FFF0000) { debugs(20, 0, "WARNING: preventing off_t overflow for " << url()); abort(); return; } #endif if (swap_status == SWAPOUT_WRITING) assert(mem_obj->inmem_lo <= mem_obj->objectBytesOnDisk() ); if (!swapOutAble()) return; debugs(20, 7, "storeSwapOut: swapout_size = " << swapout_maxsize); - if (swapout_maxsize == 0) { - if (store_status == STORE_OK) - swapOutFileClose(); - - return; /* Nevermore! */ + if (swapout_maxsize == 0) { // swapped everything we got + if (store_status == STORE_OK) { // got everything we wanted + assert(mem_obj->object_sz >= 0); + swapOutFileClose(StoreIOState::wroteAll); + } + // else need more data to swap out + return; } if (store_status == STORE_PENDING) { /* wait for a full block to write */ if (swapout_maxsize < SM_PAGE_SIZE) return; /* * Wait until we are below the disk FD limit, only if the * next server-side read won't be deferred. */ if (storeTooManyDiskFilesOpen() && !checkDeferRead(-1)) return; } /* Ok, we have stuff to swap out. Is there a swapout.sio open? */ if (swap_status == SWAPOUT_NONE) { assert(mem_obj->swapout.sio == NULL); assert(mem_obj->inmem_lo == 0); @@ -279,95 +286,97 @@ return; /* ENTRY_CACHABLE will be cleared and we'll never get here again */ } if (mem_obj->swapout.sio == NULL) return; doPages(this); if (mem_obj->swapout.sio == NULL) /* oops, we're not swapping out any more */ return; if (store_status == STORE_OK) { /* * If the state is STORE_OK, then all data must have been given * to the filesystem at this point because storeSwapOut() is * not going to be called again for this entry. */ + assert(mem_obj->object_sz >= 0); assert(mem_obj->endOffset() == mem_obj->swapout.queue_offset); - swapOutFileClose(); + swapOutFileClose(StoreIOState::wroteAll); } } void -StoreEntry::swapOutFileClose() +StoreEntry::swapOutFileClose(int how) { assert(mem_obj != NULL); - debugs(20, 3, "storeSwapOutFileClose: " << getMD5Text()); + debugs(20, 3, "storeSwapOutFileClose: " << getMD5Text() << " how=" << how); debugs(20, 3, "storeSwapOutFileClose: sio = " << mem_obj->swapout.sio.getRaw()); if (mem_obj->swapout.sio == NULL) return; - storeClose(mem_obj->swapout.sio); + storeClose(mem_obj->swapout.sio, how); } static void storeSwapOutFileClosed(void *data, int errflag, StoreIOState::Pointer self) { generic_cbdata *c = (generic_cbdata *)data; StoreEntry *e = (StoreEntry *)c->data; MemObject *mem = e->mem_obj; assert(mem->swapout.sio == self); assert(e->swap_status == SWAPOUT_WRITING); cbdataFree(c); - if (errflag) { + // if object_size is still unknown, the entry was probably aborted + if (errflag || e->objectLen() < 0) { debugs(20, 2, "storeSwapOutFileClosed: dirno " << e->swap_dirn << ", swapfile " << std::hex << std::setw(8) << std::setfill('0') << std::uppercase << e->swap_filen << ", errflag=" << errflag); if (errflag == DISK_NO_SPACE_LEFT) { /* FIXME: this should be handle by the link from store IO to * Store, rather than being a top level API call. */ e->store()->diskFull(); storeConfigure(); } if (e->swap_filen >= 0) e->unlink(); assert(e->swap_status == SWAPOUT_NONE); e->releaseRequest(); } else { /* swapping complete */ debugs(20, 3, "storeSwapOutFileClosed: SwapOut complete: '" << e->url() << "' to " << e->swap_dirn << ", " << std::hex << std::setw(8) << std::setfill('0') << std::uppercase << e->swap_filen); - debugs(20, 3, "storeSwapOutFileClosed: Should be:" << - e->swap_file_sz << " = " << e->objectLen() << " + " << - mem->swap_hdr_sz); + debugs(20, 5, HERE << "swap_file_sz = " << + e->objectLen() << " + " << mem->swap_hdr_sz); + assert(e->objectLen() >= 0); // we checked that above e->swap_file_sz = e->objectLen() + mem->swap_hdr_sz; e->swap_status = SWAPOUT_DONE; e->store()->updateSize(e->swap_file_sz, 1); if (e->checkCachable()) { storeLog(STORE_LOG_SWAPOUT, e); storeDirSwapLog(e, SWAP_LOG_ADD); } statCounter.swap.outs++; } debugs(20, 3, "storeSwapOutFileClosed: " << __FILE__ << ":" << __LINE__); mem->swapout.sio = NULL; e->unlock(); } /* * Is this entry a candidate for writing to disk? */ === modified file 'src/tests/TestSwapDir.cc' --- src/tests/TestSwapDir.cc 2010-12-04 01:41:43 +0000 +++ src/tests/TestSwapDir.cc 2011-02-11 18:24:45 +0000 @@ -6,43 +6,44 @@ uint64_t TestSwapDir::maxSize() const { return 3; } void TestSwapDir::stat(StoreEntry &) const { const_cast(this)->statsCalled = true; } void TestSwapDir::reconfigure(int, char*) {} void TestSwapDir::init() {} -int -TestSwapDir::canStore(const StoreEntry&) const +bool +TestSwapDir::canStore(const StoreEntry &, int64_t, int &load) const { + load = 0; return true; } StoreIOState::Pointer TestSwapDir::createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) { return NULL; } StoreIOState::Pointer TestSwapDir::openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) { return NULL; } void TestSwapDir::parse(int, char*) {} StoreSearch * === modified file 'src/tests/TestSwapDir.h' --- src/tests/TestSwapDir.h 2010-12-04 01:41:43 +0000 +++ src/tests/TestSwapDir.h 2011-02-11 17:57:09 +0000 @@ -1,29 +1,29 @@ #ifndef TEST_TESTSWAPDIR #define TEST_TESTSWAPDIR #include "squid.h" #include "SwapDir.h" class TestSwapDir : public SwapDir { public: TestSwapDir() : SwapDir("test"), statsCalled (false) {} bool statsCalled; virtual uint64_t maxSize() const; virtual void stat(StoreEntry &) const; /* output stats to the provided store entry */ virtual void reconfigure(int, char*); virtual void init(); - virtual int canStore(const StoreEntry&) const; + virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const; virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); virtual void parse(int, char*); virtual StoreSearch *search(String, HttpRequest *); }; typedef RefCount TestSwapDirPointer; #endif /* TEST_TESTSWAPDIR */ === modified file 'src/tests/stub_store_swapout.cc' --- src/tests/stub_store_swapout.cc 2010-10-28 18:52:59 +0000 +++ src/tests/stub_store_swapout.cc 2011-02-13 23:04:18 +0000 @@ -21,41 +21,41 @@ * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "Store.h" StoreIoStats store_io_stats; void -StoreEntry::swapOutFileClose() +StoreEntry::swapOutFileClose(int) { fatal ("Not implemented"); } bool StoreEntry::swapOutAble() const { fatal ("Not implemented"); return false; } void StoreEntry::swapOut() { fatal ("Not implemented"); } /* wrong stub file... */ void storeUnlink(StoreEntry * e)