Support more collapsed forwarding hit cases: Allow STORE_MEMORY_CLIENTs to open disk files if needed and possible. STORE_*_CLIENT designation is rather buggy (several known XXXs). Some collapsed clients are marked as STORE_MEMORY_CLIENTs (for the lack of info at determination time) but their hit content may actually come from a disk cache. Do not abandon writing a collapsed cache entry when we cannot cache the entry in RAM if the entry can be cached on disk instead. Both shared memory cache and the disk cache have to refuse to cache the entry for it to become non-collapsible. This dual refusal is difficult to detect because each cache may make the caching decision at different times. Added StoreEntry methods to track those decisions and react to them. Recognize disk cache as a potential source of the collapsed entry when the memory cache is configured. While collapsed entries would normally be found in the shared memory cache, caching policies and other factors may prohibit memory caching but still allow disk caching. Memory cache is still preferred. Do not use unknown entry size in StoreEntry::checkTooSmall() determination. The size of collapsed entries is often unknown, even when they are STORE_OK (because swap_hdr_sz is unknown when the other worker has created the cache entry). The same code has been using this ignore-unknowns logic for the Content-Length header value, so the rejection of unknown entry size (added as a part of C++ conversion without a dedicated message in r5766) could have been a typo. === modified file 'src/MemStore.cc' --- src/MemStore.cc 2014-02-21 16:14:05 +0000 +++ src/MemStore.cc 2014-06-23 23:34:53 +0000 @@ -465,40 +465,41 @@ MemStore::shouldCache(const StoreEntry & return true; } /// locks map anchor and preps to store the entry in shared memory bool MemStore::startCaching(StoreEntry &e) { sfileno index = 0; Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast(e.key), index); if (!slot) { debugs(20, 5, HERE << "No room in mem-cache map to index " << e); return false; } assert(e.mem_obj); e.mem_obj->memCache.index = index; e.mem_obj->memCache.io = MemObject::ioWriting; slot->set(e); map->startAppending(index); + e.memOutDecision(true); return true; } /// copies all local data to shared memory void MemStore::copyToShm(StoreEntry &e) { // prevents remote readers from getting ENTRY_FWD_HDR_WAIT entries and // not knowing when the wait is over if (EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT)) { debugs(20, 5, "postponing copying " << e << " for ENTRY_FWD_HDR_WAIT"); return; } assert(map); assert(e.mem_obj); const int32_t index = e.mem_obj->memCache.index; assert(index >= 0); Ipc::StoreMapAnchor &anchor = map->writeableEntry(index); @@ -629,41 +630,41 @@ MemStore::noteFreeMapSlice(const sfileno } else { *waitingFor.slot = slotId; *waitingFor.page = pageId; waitingFor.slot = NULL; waitingFor.page = NULL; pageId = Ipc::Mem::PageId(); } } void MemStore::write(StoreEntry &e) { assert(e.mem_obj); debugs(20, 7, "entry " << e); switch (e.mem_obj->memCache.io) { case MemObject::ioUndecided: if (!shouldCache(e) || !startCaching(e)) { e.mem_obj->memCache.io = MemObject::ioDone; - Store::Root().transientsAbandon(e); + e.memOutDecision(false); return; } break; case MemObject::ioDone: case MemObject::ioReading: return; // we should not write in all of the above cases case MemObject::ioWriting: break; // already decided to write and still writing } try { copyToShm(e); if (e.store_status == STORE_OK) // done receiving new content completeWriting(e); else CollapsedForwarding::Broadcast(e); return; } catch (const std::exception &x) { // TODO: should we catch ... as well? === modified file 'src/Store.h' --- src/Store.h 2014-02-21 10:46:19 +0000 +++ src/Store.h 2014-06-23 23:34:53 +0000 @@ -85,40 +85,46 @@ public: /** Check if the Store entry is emtpty * \retval true Store contains 0 bytes of data. * \retval false Store contains 1 or more bytes of data. * \retval false Store contains negative content !!!!!! */ virtual bool isEmpty() const { assert (mem_obj); return mem_obj->endOffset() == 0; } virtual bool isAccepting() const; virtual size_t bytesWanted(Range const aRange, bool ignoreDelayPool = false) const; virtual void complete(); virtual store_client_t storeClientType() const; virtual char const *getSerialisedMetaData(); void replaceHttpReply(HttpReply *, bool andStartWriting = true); void startWriting(); ///< pack and write reply headers and, maybe, body /// whether we may start writing to disk (now or in the future) virtual bool mayStartSwapOut(); virtual void trimMemory(const bool preserveSwappable); + + // called when a decision to cache in memory has been made + void memOutDecision(const bool willCacheInRam); + // called when a decision to cache on disk has been made + void swapOutDecision(const MemObject::SwapOut::Decision &decision); + void abort(); void unlink(); void makePublic(); void makePrivate(); void setPublicKey(); void setPrivateKey(); void expireNow(); void releaseRequest(); void negativeCache(); void cacheNegatively(); /** \todo argh, why both? */ void invokeHandlers(); void purgeMem(); void cacheInMemory(); ///< start or continue storing in memory cache void swapOut(); /// whether we are in the process of writing this entry to disk bool swappingOut() const { return swap_status == SWAPOUT_WRITING; } void swapOutFileClose(int how); const char *url() const; int checkCachable(); int checkNegativeHit() const; @@ -214,40 +220,43 @@ public: /// disclaim shared ownership; may remove entry from store and delete it /// returns remaning lock level (zero for unlocked and possibly gone entry) int unlock(const char *context); /// returns a local concurrent use counter, for debugging int locks() const { return static_cast(lock_count); } /// update last reference timestamp and related Store metadata void touch(); virtual void release(); #if USE_ADAPTATION /// call back producer when more buffer space is available void deferProducer(const AsyncCall::Pointer &producer); /// calls back producer registered with deferProducer void kickProducer(); #endif +protected: + void transientsAbandonmentCheck(); + private: static MemAllocator *pool; unsigned short lock_count; /* Assume < 65536! */ #if USE_ADAPTATION /// producer callback registered with deferProducer AsyncCall::Pointer deferredProducer; #endif bool validLength() const; bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const; }; std::ostream &operator <<(std::ostream &os, const StoreEntry &e); /// \ingroup StoreAPI class NullStoreEntry:public StoreEntry { === modified file 'src/StoreClient.h' --- src/StoreClient.h 2013-08-15 22:09:07 +0000 +++ src/StoreClient.h 2014-06-23 23:34:53 +0000 @@ -85,41 +85,41 @@ public: bool store_copying; bool copy_event_pending; } flags; #if USE_DELAY_POOLS DelayId delayId; void setDelayId(DelayId delay_id); #endif dlink_node node; /* Below here is private - do no alter outside storeClient calls */ StoreIOBuffer copyInto; private: bool moreToSend() const; void fileRead(); void scheduleDiskRead(); void scheduleMemRead(); void scheduleRead(); - void startSwapin(); + bool startSwapin(); void unpackHeader(char const *buf, ssize_t len); int type; bool object_ok; /* Until we finish stuffing code into store_client */ public: struct Callback { Callback ():callback_handler(NULL), callback_data(NULL) {} Callback (STCB *, void *); bool pending() const; STCB *callback_handler; void *callback_data; } _callback; private: CBDATA_CLASS2(store_client); === modified file 'src/store.cc' --- src/store.cc 2014-06-05 08:28:20 +0000 +++ src/store.cc 2014-06-23 23:34:53 +0000 @@ -924,41 +924,41 @@ struct _store_check_cachable_hist { int storeTooManyDiskFilesOpen(void) { if (Config.max_open_disk_fds == 0) return 0; if (store_open_disk_fd > Config.max_open_disk_fds) return 1; return 0; } int StoreEntry::checkTooSmall() { if (EBIT_TEST(flags, ENTRY_SPECIAL)) return 0; if (STORE_OK == store_status) - if (mem_obj->object_sz < 0 || + if (mem_obj->object_sz >= 0 && mem_obj->object_sz < Config.Store.minObjectSize) return 1; if (getReply()->content_length > -1) if (getReply()->content_length < Config.Store.minObjectSize) return 1; return 0; } // TODO: remove checks already performed by swapoutPossible() // TODO: move "too many open..." checks outside -- we are called too early/late int StoreEntry::checkCachable() { #if CACHE_ALL_METHODS if (mem_obj->method != Http::METHOD_GET) { debugs(20, 2, "StoreEntry::checkCachable: NO: non-GET method"); ++store_check_cachable_hist.no.non_get; } else #endif @@ -1843,40 +1843,71 @@ StoreEntry::startWriting() mem_obj->markEndOfReplyHeaders(); EBIT_CLR(flags, ENTRY_FWD_HDR_WAIT); rep->body.packInto(&p); packerClean(&p); } char const * StoreEntry::getSerialisedMetaData() { StoreMeta *tlv_list = storeSwapMetaBuild(this); int swap_hdr_sz; char *result = storeSwapMetaPack(tlv_list, &swap_hdr_sz); storeSwapTLVFree(tlv_list); assert (swap_hdr_sz >= 0); mem_obj->swap_hdr_sz = (size_t) swap_hdr_sz; return result; } +/** + * Abandon the transient entry our worker has created if neither the shared + * memory cache nor the disk cache wants to store it. Collapsed requests, if + * any, should notice and use Plan B instead of getting stuck waiting for us + * to start swapping the entry out. + */ +void +StoreEntry::transientsAbandonmentCheck() { + if (mem_obj && !mem_obj->smpCollapsed && // this worker is responsible + mem_obj->xitTable.index >= 0 && // other workers may be interested + mem_obj->memCache.index < 0 && // rejected by the shared memory cache + mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) { + debugs(20, 7, "cannot be shared: " << *this); + if (!shutting_down) // Store::Root() is FATALly missing during shutdown + Store::Root().transientsAbandon(*this); + } +} + +void +StoreEntry::memOutDecision(const bool willCacheInRam) { + transientsAbandonmentCheck(); +} + +void +StoreEntry::swapOutDecision(const MemObject::SwapOut::Decision &decision) { + // Abandon our transient entry if neither shared memory nor disk wants it. + assert(mem_obj); + mem_obj->swapout.decision = decision; + transientsAbandonmentCheck(); +} + void StoreEntry::trimMemory(const bool preserveSwappable) { /* * DPW 2007-05-09 * Bug #1943. We must not let go any data for IN_MEMORY * objects. We have to wait until the mem_status changes. */ if (mem_status == IN_MEMORY) return; if (EBIT_TEST(flags, ENTRY_SPECIAL)) return; // cannot trim because we do not load them again if (preserveSwappable) mem_obj->trimSwappable(); else mem_obj->trimUnSwappable(); debugs(88, 7, *this << " inmem_lo=" << mem_obj->inmem_lo); === modified file 'src/store_client.cc' --- src/store_client.cc 2014-01-16 00:24:34 +0000 +++ src/store_client.cc 2014-06-23 23:34:53 +0000 @@ -260,55 +260,53 @@ store_client::copy(StoreEntry * anEntry, #if USE_ADAPTATION anEntry->kickProducer(); #endif anEntry->unlock("store_client::copy"); // Add no code here. This object may no longer exist. } /// Whether there is (or will be) more entry data for us. bool store_client::moreToSend() const { if (entry->store_status == STORE_PENDING) return true; // there may be more coming /* STORE_OK, including aborted entries: no more data is coming */ const int64_t len = entry->objectLen(); - // If we do not know the entry length, then we have to open the swap file, - // which is only possible if there is one AND if we are allowed to use it. - const bool canSwapIn = entry->swap_filen >= 0 && - getType() == STORE_DISK_CLIENT; + // If we do not know the entry length, then we have to open the swap file. + const bool canSwapIn = entry->swap_filen >= 0; if (len < 0) return canSwapIn; if (copyInto.offset >= len) return false; // sent everything there is if (canSwapIn) return true; // if we lack prefix, we can swap it in // If we cannot swap in, make sure we have what we want in RAM. Otherwise, - // scheduleRead calls scheduleDiskRead which asserts on STORE_MEM_CLIENTs. + // scheduleRead calls scheduleDiskRead which asserts without a swap file. const MemObject *mem = entry->mem_obj; return mem && mem->inmem_lo <= copyInto.offset && copyInto.offset < mem->endOffset(); } static void storeClientCopy2(StoreEntry * e, store_client * sc) { /* reentrancy not allowed - note this could lead to * dropped events */ if (sc->flags.copy_event_pending) { return; } if (EBIT_TEST(e->flags, ENTRY_FWD_HDR_WAIT)) { debugs(90, 5, "storeClientCopy2: returning because ENTRY_FWD_HDR_WAIT set"); return; } @@ -364,101 +362,105 @@ store_client::doCopy(StoreEntry *anEntry /* Check that we actually have data */ if (anEntry->store_status == STORE_PENDING && copyInto.offset >= mem->endOffset()) { debugs(90, 3, "store_client::doCopy: Waiting for more"); flags.store_copying = false; return; } /* * Slight weirdness here. We open a swapin file for any * STORE_DISK_CLIENT, even if we can copy the requested chunk * from memory in the next block. We must try to open the * swapin file before sending any data to the client side. If * we postpone the open, and then can not open the file later * on, the client loses big time. Its transfer just gets cut * off. Better to open it early (while the client side handler * is clientCacheHit) so that we can fall back to a cache miss * if needed. */ - if (STORE_DISK_CLIENT == getType() && swapin_sio == NULL) - startSwapin(); - else - scheduleRead(); + if (STORE_DISK_CLIENT == getType() && swapin_sio == NULL) { + if (!startSwapin()) + return; // failure + } + scheduleRead(); } -void +/// opens the swapin "file" if possible; otherwise, fail()s and returns false +bool store_client::startSwapin() { debugs(90, 3, "store_client::doCopy: Need to open swap in file"); /* gotta open the swapin file */ if (storeTooManyDiskFilesOpen()) { /* yuck -- this causes a TCP_SWAPFAIL_MISS on the client side */ fail(); flags.store_copying = false; - return; + return false; } else if (!flags.disk_io_pending) { /* Don't set store_io_pending here */ storeSwapInStart(this); if (swapin_sio == NULL) { fail(); flags.store_copying = false; - return; + return false; } - /* - * If the open succeeds we either copy from memory, or - * schedule a disk read in the next block. - */ - scheduleRead(); - - return; + return true; } else { debugs(90, DBG_IMPORTANT, "WARNING: Averted multiple fd operation (1)"); flags.store_copying = false; - return; + return false; } } void store_client::scheduleRead() { MemObject *mem = entry->mem_obj; if (copyInto.offset >= mem->inmem_lo && copyInto.offset < mem->endOffset()) scheduleMemRead(); else scheduleDiskRead(); } void store_client::scheduleDiskRead() { /* What the client wants is not in memory. Schedule a disk read */ - assert(STORE_DISK_CLIENT == getType()); + if (getType() == STORE_DISK_CLIENT) { + // we should have called startSwapin() already + assert(swapin_sio != NULL); + } else + if (!swapin_sio && !startSwapin()) { + debugs(90, 3, "bailing after swapin start failure for " << *entry); + assert(!flags.store_copying); + return; + } assert(!flags.disk_io_pending); - debugs(90, 3, "store_client::doCopy: reading from STORE"); + debugs(90, 3, "reading " << *entry << " from disk"); fileRead(); flags.store_copying = false; } void store_client::scheduleMemRead() { /* What the client wants is in memory */ /* Old style */ debugs(90, 3, "store_client::doCopy: Copying normal from memory"); size_t sz = entry->mem_obj->data_hdr.copy(copyInto); callback(sz); flags.store_copying = false; } void store_client::fileRead() { === modified file 'src/store_dir.cc' --- src/store_dir.cc 2014-02-21 10:46:19 +0000 +++ src/store_dir.cc 2014-06-23 23:34:53 +0000 @@ -1043,41 +1043,41 @@ StoreController::syncCollapsed(const sfi debugs(20, 7, "waiting " << *collapsed); } } /// Called for in-transit entries that are not yet anchored to a cache. /// For cached entries, return true after synchronizing them with their cache /// (making inSync true on success). For not-yet-cached entries, return false. bool StoreController::anchorCollapsed(StoreEntry &collapsed, bool &inSync) { // this method is designed to work with collapsed transients only assert(collapsed.mem_obj); assert(collapsed.mem_obj->xitTable.index >= 0); assert(collapsed.mem_obj->smpCollapsed); debugs(20, 7, "anchoring " << collapsed); bool found = false; if (memStore) found = memStore->anchorCollapsed(collapsed, inSync); - else if (Config.cacheSwap.n_configured) + if (!found && Config.cacheSwap.n_configured) found = anchorCollapsedOnDisk(collapsed, inSync); if (found) { if (inSync) debugs(20, 7, "anchored " << collapsed); else debugs(20, 5, "failed to anchor " << collapsed); } else { debugs(20, 7, "skipping not yet cached " << collapsed); } return found; } StoreHashIndex::StoreHashIndex() { if (store_table) abort(); assert (store_table == NULL); } === modified file 'src/store_swapout.cc' --- src/store_swapout.cc 2013-12-31 18:49:41 +0000 +++ src/store_swapout.cc 2014-06-23 23:34:53 +0000 @@ -52,60 +52,60 @@ static StoreIOState::STFNCB storeSwapOut // wrapper to cross C/C++ ABI boundary. xfree is extern "C" for libraries. static void xfree_cppwrapper(void *x) { xfree(x); } /* start swapping object to disk */ static void storeSwapOutStart(StoreEntry * e) { MemObject *mem = e->mem_obj; StoreIOState::Pointer sio; assert(mem); /* Build the swap metadata, so the filesystem will know how much * metadata there is to store */ debugs(20, 5, "storeSwapOutStart: Begin SwapOut '" << e->url() << "' to dirno " << e->swap_dirn << ", fileno " << std::hex << std::setw(8) << std::setfill('0') << std::uppercase << e->swap_filen); e->swap_status = SWAPOUT_WRITING; - mem->swapout.decision = MemObject::SwapOut::swStarted; + e->swapOutDecision(MemObject::SwapOut::swStarted); /* If we start swapping out objects with OutOfBand Metadata, * then this code needs changing */ /* TODO: make some sort of data,size refcounted immutable buffer * and stop fooling ourselves with "const char*" buffers. */ // Create metadata now, possibly in vain: storeCreate needs swap_hdr_sz. const char *buf = e->getSerialisedMetaData (); assert(buf); /* Create the swap file */ generic_cbdata *c = new generic_cbdata(e); sio = storeCreate(e, storeSwapOutFileNotify, storeSwapOutFileClosed, c); if (sio == NULL) { e->swap_status = SWAPOUT_NONE; - mem->swapout.decision = MemObject::SwapOut::swImpossible; + e->swapOutDecision(MemObject::SwapOut::swImpossible); delete c; xfree((char*)buf); storeLog(STORE_LOG_SWAPOUTFAIL, e); return; } mem->swapout.sio = sio; /* Don't lock until after create, or the replacement * code might get confused */ e->lock("storeSwapOutStart"); /* Pick up the file number if it was assigned immediately */ e->swap_filen = mem->swapout.sio->swap_filen; e->swap_dirn = mem->swapout.sio->swap_dirn; /* write out the swap metadata */ storeIOWrite(mem->swapout.sio, buf, mem->swap_hdr_sz, 0, xfree_cppwrapper); } @@ -354,117 +354,117 @@ storeSwapOutFileClosed(void *data, int e ++statCounter.swap.outs; } debugs(20, 3, "storeSwapOutFileClosed: " << __FILE__ << ":" << __LINE__); mem->swapout.sio = NULL; e->unlock("storeSwapOutFileClosed"); } bool StoreEntry::mayStartSwapOut() { // must be checked in the caller assert(!EBIT_TEST(flags, ENTRY_ABORTED)); assert(!swappingOut()); if (!Config.cacheSwap.n_configured) return false; assert(mem_obj); - MemObject::SwapOut::Decision &decision = mem_obj->swapout.decision; + const MemObject::SwapOut::Decision &decision = mem_obj->swapout.decision; // if we decided that starting is not possible, do not repeat same checks if (decision == MemObject::SwapOut::swImpossible) { debugs(20, 3, HERE << " already rejected"); return false; } // if we swapped out already, do not start over if (swap_status == SWAPOUT_DONE) { debugs(20, 3, "already did"); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; } // if we stared swapping out already, do not start over if (decision == MemObject::SwapOut::swStarted) { debugs(20, 3, "already started"); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; } // if we decided that swapout is possible, do not repeat same checks if (decision == MemObject::SwapOut::swPossible) { debugs(20, 3, "already allowed"); return true; } if (!checkCachable()) { debugs(20, 3, HERE << "not cachable"); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; } if (EBIT_TEST(flags, ENTRY_SPECIAL)) { debugs(20, 3, HERE << url() << " SPECIAL"); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; } if (mem_obj->inmem_lo > 0) { debugs(20, 3, "storeSwapOut: (inmem_lo > 0) imem_lo:" << mem_obj->inmem_lo); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; } if (!mem_obj->isContiguous()) { debugs(20, 3, "storeSwapOut: not Contiguous"); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; } // check cache_dir max-size limit if all cache_dirs have it if (store_maxobjsize >= 0) { // TODO: add estimated store metadata size to be conservative // use guaranteed maximum if it is known const int64_t expectedEnd = mem_obj->expectedReplySize(); debugs(20, 7, HERE << "expectedEnd = " << expectedEnd); if (expectedEnd > store_maxobjsize) { debugs(20, 3, HERE << "will not fit: " << expectedEnd << " > " << store_maxobjsize); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; // known to outgrow the limit eventually } // use current minimum (always known) const int64_t currentEnd = mem_obj->endOffset(); if (currentEnd > store_maxobjsize) { debugs(20, 3, HERE << "does not fit: " << currentEnd << " > " << store_maxobjsize); - decision = MemObject::SwapOut::swImpossible; + swapOutDecision(MemObject::SwapOut::swImpossible); return false; // already does not fit and may only get bigger } // prevent final default swPossible answer for yet unknown length if (expectedEnd < 0 && store_status != STORE_OK) { const int64_t maxKnownSize = mem_obj->availableForSwapOut(); debugs(20, 7, HERE << "maxKnownSize= " << maxKnownSize); /* * NOTE: the store_maxobjsize here is the global maximum * size of object cacheable in any of Squid cache stores * both disk and memory stores. * * However, I am worried that this * deferance may consume a lot of memory in some cases. * Should we add an option to limit this memory consumption? */ debugs(20, 5, HERE << "Deferring swapout start for " << (store_maxobjsize - maxKnownSize) << " bytes"); return true; // may still fit, but no final decision yet } } - decision = MemObject::SwapOut::swPossible; + swapOutDecision(MemObject::SwapOut::swPossible); return true; }