author: Martin Huter , Alex Rousskov , Christos Tsantilas Bug 2619: Excessive RAM growth due to unlimited adapted body data consumption If the client does not read from the open connection (i.e. the user does not confirm the browsers download-message-box in microsofts IE), squid keeps on reading data from the ICAP server into the store entry, while no more data can be delivered to the client. Thus the store entry in memory is growing and squid may - in worst case - consume memory up to the size of the users download. This patch add API to StoreEntry to call the producer back when released memory/space from the StoreEntry and add code to the ICAP client code to not consume body data comes from the ICAP server when there is not available space in the store entry. === modified file 'src/Server.cc' --- src/Server.cc 2011-10-23 00:16:42 +0000 +++ src/Server.cc 2011-11-11 17:03:03 +0000 @@ -33,54 +33,57 @@ */ #include "squid.h" #include "acl/Gadgets.h" #include "base/TextException.h" #include "comm/Connection.h" #include "comm/forward.h" #include "comm/Write.h" #include "Server.h" #include "Store.h" #include "HttpRequest.h" #include "HttpReply.h" #include "errorpage.h" #include "err_detail_type.h" #include "SquidTime.h" #if USE_ADAPTATION #include "adaptation/AccessCheck.h" #include "adaptation/Answer.h" #include "adaptation/Iterator.h" +#include "base/AsyncCall.h" #endif // implemented in client_side_reply.cc until sides have a common parent extern void purgeEntriesByUrl(HttpRequest * req, const char *url); ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"), requestSender(NULL), #if USE_ADAPTATION adaptedHeadSource(NULL), adaptationAccessCheckPending(false), startedAdaptation(false), #endif - receivedWholeRequestBody(false) + receivedWholeRequestBody(false), + theVirginReply(NULL), + theFinalReply(NULL) { fwd = theFwdState; entry = fwd->entry; entry->lock(); request = HTTPMSGLOCK(fwd->request); } ServerStateData::~ServerStateData() { // paranoid: check that swanSong has been called assert(!requestBodySource); #if USE_ADAPTATION assert(!virginBodyDestination); assert(!adaptedBodySource); #endif entry->unlock(); @@ -256,67 +259,70 @@ ServerStateData::abortOnBadEntry(const char *abortReason) { if (entry->isAccepting()) return false; debugs(11,5, HERE << "entry is not Accepting!"); abortTransaction(abortReason); return true; } // more request or adapted response body is available void ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp) { #if USE_ADAPTATION if (adaptedBodySource == bp) { handleMoreAdaptedBodyAvailable(); return; } #endif - handleMoreRequestBodyAvailable(); + if (requestBodySource == bp) + handleMoreRequestBodyAvailable(); } // the entire request or adapted response body was provided, successfully void ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp) { #if USE_ADAPTATION if (adaptedBodySource == bp) { handleAdaptedBodyProductionEnded(); return; } #endif - handleRequestBodyProductionEnded(); + if (requestBodySource == bp) + handleRequestBodyProductionEnded(); } // premature end of the request or adapted response body production void ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp) { #if USE_ADAPTATION if (adaptedBodySource == bp) { handleAdaptedBodyProducerAborted(); return; } #endif - handleRequestBodyProducerAborted(); + if (requestBodySource == bp) + handleRequestBodyProducerAborted(); } // more origin request body data is available void ServerStateData::handleMoreRequestBodyAvailable() { if (!requestSender) sendMoreRequestBody(); else debugs(9,3, HERE << "waiting for request body write to complete"); } // there will be no more handleMoreRequestBodyAvailable calls void ServerStateData::handleRequestBodyProductionEnded() { receivedWholeRequestBody = true; if (!requestSender) doneSendingRequestBody(); @@ -681,70 +687,125 @@ } HttpReply *rep = dynamic_cast(msg); assert(rep); debugs(11,5, HERE << this << " setting adapted reply to " << rep); setFinalReply(rep); assert(!adaptedBodySource); if (rep->body_pipe != NULL) { // subscribe to receive adapted body adaptedBodySource = rep->body_pipe; // assume that ICAP does not auto-consume on failures assert(adaptedBodySource->setConsumerIfNotLate(this)); } else { // no body if (doneWithAdaptation()) // we may still be sending virgin response handleAdaptationCompleted(); } } -// more adapted response body is available void -ServerStateData::handleMoreAdaptedBodyAvailable() +ServerStateData::resumeBodyStorage() { - const size_t contentSize = adaptedBodySource->buf().contentSize(); + if (abortOnBadEntry("store entry aborted while kick producer callback")) + return; - debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " << - "response body at offset " << adaptedBodySource->consumedSize()); + if(!adaptedBodySource) + return; + + handleMoreAdaptedBodyAvailable(); + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) + endAdaptedBodyConsumption(); +} + +// more adapted response body is available +void +ServerStateData::handleMoreAdaptedBodyAvailable() +{ if (abortOnBadEntry("entry refuses adapted body")) return; assert(entry); + + size_t contentSize = adaptedBodySource->buf().contentSize(); + bool consumedPartially = false; + + if (!contentSize) + return; // XXX: bytesWanted asserts on zero-size ranges + + // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data. + // We have to add 1 to avoid suspending forever. + const size_t bytesWanted = entry->bytesWanted(Range(0, contentSize)); + const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0; + + if (spaceAvailable < contentSize ) { + // No or partial body data consuming + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage", + Dialer(this, &ServerStateData::resumeBodyStorage)); + entry->deferProducer(call); + } + + // XXX: bytesWanted API does not allow us to write just one byte! + if (!spaceAvailable && contentSize > 1) { + debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize()); + return; + } + + if (spaceAvailable < contentSize ) { + debugs(11, 5, HERE << "postponing storage of " << + (contentSize - spaceAvailable) << " body bytes"); + contentSize = spaceAvailable; + consumedPartially=true; + } + + debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize()); + BodyPipeCheckout bpc(*adaptedBodySource); - const StoreIOBuffer ioBuf(&bpc.buf, currentOffset); - currentOffset += bpc.buf.size; + const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize); + currentOffset += ioBuf.length; entry->write(ioBuf); bpc.buf.consume(contentSize); bpc.checkIn(); } // the entire adapted response body was produced, successfully void ServerStateData::handleAdaptedBodyProductionEnded() { - stopConsumingFrom(adaptedBodySource); - if (abortOnBadEntry("entry went bad while waiting for adapted body eof")) return; + + // end consumption if we consumed everything + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) + endAdaptedBodyConsumption(); + // else resumeBodyStorage() will eventually consume the rest +} +void +ServerStateData::endAdaptedBodyConsumption() +{ + stopConsumingFrom(adaptedBodySource); handleAdaptationCompleted(); } // premature end of the adapted response body void ServerStateData::handleAdaptedBodyProducerAborted() { stopConsumingFrom(adaptedBodySource); handleAdaptationAborted(); } // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded void ServerStateData::handleAdaptationCompleted() { debugs(11,5, HERE << "handleAdaptationCompleted"); cleanAdaptation(); // We stop reading origin response because we have no place to put it and // cannot use it. If some origin servers do not like that or if we want to // reuse more pconns, we can add code to discard unneeded origin responses. === modified file 'src/Server.h' --- src/Server.h 2011-10-21 16:20:42 +0000 +++ src/Server.h 2011-11-11 13:42:13 +0000 @@ -130,40 +130,45 @@ virtual bool doneWithServer() const = 0; /**< did we end communication? */ /// Entry-dependent callbacks use this check to quit if the entry went bad bool abortOnBadEntry(const char *abortReason); #if USE_ADAPTATION void startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause); void adaptVirginReplyBody(const char *buf, ssize_t len); void cleanAdaptation(); virtual bool doneWithAdaptation() const; /**< did we end ICAP communication? */ // BodyConsumer for ICAP: consume adapted response body. void handleMoreAdaptedBodyAvailable(); void handleAdaptedBodyProductionEnded(); void handleAdaptedBodyProducerAborted(); void handleAdaptedHeader(HttpMsg *msg); void handleAdaptationCompleted(); void handleAdaptationBlocked(const Adaptation::Answer &answer); void handleAdaptationAborted(bool bypassable = false); + + /// called by StoreEntry when it has more buffer space available + void resumeBodyStorage(); + /// called when the entire adapted response body is consumed + void endAdaptedBodyConsumption(); #endif protected: const HttpReply *virginReply() const; HttpReply *virginReply(); HttpReply *setVirginReply(HttpReply *r); HttpReply *finalReply(); HttpReply *setFinalReply(HttpReply *r); // Kids use these to stuff data into the response instead of messing with the entry directly void adaptOrFinalizeReply(); void addVirginReplyBody(const char *buf, ssize_t len); void storeReplyBody(const char *buf, ssize_t len); size_t replyBodySpace(const MemBuf &readBuf, const size_t minSpace) const; void adjustBodyBytesRead(const int64_t delta); // These should be private int64_t currentOffset; /**< Our current offset in the StoreEntry */ === modified file 'src/Store.h' --- src/Store.h 2011-10-14 16:21:48 +0000 +++ src/Store.h 2011-11-11 14:08:21 +0000 @@ -184,43 +184,55 @@ void setReleaseFlag(); #if USE_SQUID_ESI ESIElement::Pointer cachedESITree; #endif /** append bytes to the buffer */ virtual void append(char const *, int len); /** disable sending content to the clients */ virtual void buffer(); /** flush any buffered content */ virtual void flush(); /** reduce the memory lock count on the entry */ virtual int unlock(); /** increate the memory lock count on the entry */ virtual int64_t objectLen() const; virtual int64_t contentLen() const; virtual void lock(); virtual void release(); +#if USE_ADAPTATION + /// call back producer when more buffer space is available + void deferProducer(const AsyncCall::Pointer &producer); + /// calls back producer registered with deferProducer + void kickProducer(); +#endif + private: static MemAllocator *pool; +#if USE_ADAPTATION + /// producer callback registered with deferProducer + AsyncCall::Pointer deferredProducer; +#endif + bool validLength() const; bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const; }; std::ostream &operator <<(std::ostream &os, const StoreEntry &e); /// \ingroup StoreAPI class NullStoreEntry:public StoreEntry { public: static NullStoreEntry *getInstance(); bool isNull() { return true; } const char *getMD5Text() const; _SQUID_INLINE_ HttpReply const *getReply() const; void write (StoreIOBuffer) {} === modified file 'src/StoreIOBuffer.h' --- src/StoreIOBuffer.h 2010-10-29 00:12:28 +0000 +++ src/StoreIOBuffer.h 2011-11-07 11:05:28 +0000 @@ -42,40 +42,47 @@ class StoreIOBuffer { public: StoreIOBuffer():length(0), offset (0), data (NULL) {flags.error = 0;} StoreIOBuffer(size_t aLength, int64_t anOffset, char *someData) : length (aLength), offset (anOffset), data (someData) { flags.error = 0; } /* Create a StoreIOBuffer from a MemBuf and offset */ /* NOTE that MemBuf still "owns" the pointers, StoreIOBuffer is just borrowing them */ StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset) : length(aMemBuf->contentSize()), offset (anOffset), data(aMemBuf->content()) { flags.error = 0; } + StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) : + length(anLength), + offset (anOffset), + data(aMemBuf->content()) { + flags.error = 0; + } + Range range() const { return Range(offset, offset + length); } void dump() const { if (fwrite(data, length, 1, stderr)) {} if (fwrite("\n", 1, 1, stderr)) {} } struct { unsigned error:1; } flags; size_t length; int64_t offset; char *data; }; inline std::ostream & operator <<(std::ostream &os, const StoreIOBuffer &b) === modified file 'src/client_side_request.cc' --- src/client_side_request.cc 2011-10-21 16:20:42 +0000 +++ src/client_side_request.cc 2011-11-11 17:03:21 +0000 @@ -1664,71 +1664,96 @@ } // we are done with getting headers (but may be receiving body) clearAdaptation(virginHeadSource); if (!request_satisfaction_mode) doCallouts(); } void ClientHttpRequest::handleAdaptationBlock(const Adaptation::Answer &answer) { request->detailError(ERR_ACCESS_DENIED, ERR_DETAIL_REQMOD_BLOCK); AclMatchedName = answer.ruleId.termedBuf(); assert(calloutContext); calloutContext->clientAccessCheckDone(ACCESS_DENIED); AclMatchedName = NULL; } void +ClientHttpRequest::resumeBodyStorage() +{ + if(!adaptedBodySource) + return; + + noteMoreBodyDataAvailable(adaptedBodySource); +} + +void ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer) { assert(request_satisfaction_mode); assert(adaptedBodySource != NULL); - if (const size_t contentSize = adaptedBodySource->buf().contentSize()) { + if (size_t contentSize = adaptedBodySource->buf().contentSize()) { + // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data. + // We have to add 1 to avoid suspending forever. + const size_t bytesWanted = storeEntry()->bytesWanted(Range(0,contentSize)); + const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0; + + if (spaceAvailable < contentSize ) { + // No or partial body data consuming + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage", + Dialer(this, &ClientHttpRequest::resumeBodyStorage)); + storeEntry()->deferProducer(call); + } + + // XXX: bytesWanted API does not allow us to write just one byte! + if (!spaceAvailable && contentSize > 1) + return; + + if (spaceAvailable < contentSize ) + contentSize = spaceAvailable; + BodyPipeCheckout bpc(*adaptedBodySource); - const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset); + const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize); storeEntry()->write(ioBuf); - // assume can write everything - request_satisfaction_offset += contentSize; + // assume StoreEntry::write() writes the entire ioBuf + request_satisfaction_offset += ioBuf.length; bpc.buf.consume(contentSize); bpc.checkIn(); } if (adaptedBodySource->exhausted()) endRequestSatisfaction(); // else wait for more body data } void ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer) { assert(!virginHeadSource); - if (adaptedBodySource != NULL) { // did not end request satisfaction yet - // We do not expect more because noteMoreBodyDataAvailable always - // consumes everything. We do not even have a mechanism to consume - // leftovers after noteMoreBodyDataAvailable notifications seize. - assert(adaptedBodySource->exhausted()); + // should we end request satisfaction now? + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) endRequestSatisfaction(); - } } void ClientHttpRequest::endRequestSatisfaction() { debugs(85,4, HERE << this << " ends request satisfaction"); assert(request_satisfaction_mode); stopConsumingFrom(adaptedBodySource); // TODO: anything else needed to end store entry formation correctly? storeEntry()->complete(); } void ClientHttpRequest::noteBodyProducerAborted(BodyPipe::Pointer) { assert(!virginHeadSource); stopConsumingFrom(adaptedBodySource); debugs(85,3, HERE << "REQMOD body production failed"); === modified file 'src/client_side_request.h' --- src/client_side_request.h 2011-10-21 16:20:42 +0000 +++ src/client_side_request.h 2011-11-10 15:03:23 +0000 @@ -171,40 +171,42 @@ public: void startAdaptation(const Adaptation::ServiceGroupPointer &g); // private but exposed for ClientRequestContext void handleAdaptationFailure(int errDetail, bool bypassable = false); private: // Adaptation::Initiator API virtual void noteAdaptationAnswer(const Adaptation::Answer &answer); void handleAdaptedHeader(HttpMsg *msg); void handleAdaptationBlock(const Adaptation::Answer &answer); virtual void noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group); // BodyConsumer API, called by BodyPipe virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer); virtual void noteBodyProductionEnded(BodyPipe::Pointer); virtual void noteBodyProducerAborted(BodyPipe::Pointer); void endRequestSatisfaction(); + /// called by StoreEntry when it has more buffer space available + void resumeBodyStorage(); private: CbcPointer virginHeadSource; BodyPipe::Pointer adaptedBodySource; bool request_satisfaction_mode; int64_t request_satisfaction_offset; #endif }; /* client http based routines */ SQUIDCEXTERN char *clientConstructTraceEcho(ClientHttpRequest *); class ACLFilledChecklist; SQUIDCEXTERN ACLFilledChecklist *clientAclChecklistCreate(const acl_access * acl,ClientHttpRequest * http); SQUIDCEXTERN int clientHttpRequestStatus(int fd, ClientHttpRequest const *http); SQUIDCEXTERN void clientAccessCheck(ClientHttpRequest *); /* ones that should be elsewhere */ SQUIDCEXTERN void redirectStart(ClientHttpRequest *, RH *, void *); === modified file 'src/store.cc' --- src/store.cc 2011-10-27 20:39:23 +0000 +++ src/store.cc 2011-11-11 15:13:54 +0000 @@ -393,40 +393,61 @@ { debugs(20, 3, HERE << "new StoreEntry " << this); mem_obj = new MemObject(aUrl, aLogUrl); expires = lastmod = lastref = timestamp = -1; swap_status = SWAPOUT_NONE; swap_filen = -1; swap_dirn = -1; } StoreEntry::~StoreEntry() { if (swap_filen >= 0) { SwapDir &sd = dynamic_cast(*store()); sd.disconnect(*this); } delete hidden_mem_obj; } +#if USE_ADAPTATION +void +StoreEntry::deferProducer(const AsyncCall::Pointer &producer) +{ + if (!deferredProducer) + deferredProducer = producer; + else + debugs(20, 5, HERE << "Deferred producer call is allready set to: " << + *deferredProducer << ", requested call: " << *producer); +} + +void +StoreEntry::kickProducer() +{ + if(deferredProducer != NULL){ + ScheduleCallHere(deferredProducer); + deferredProducer = NULL; + } +} +#endif + void StoreEntry::destroyMemObject() { debugs(20, 3, HERE << "destroyMemObject " << mem_obj); setMemStatus(NOT_IN_MEMORY); MemObject *mem = mem_obj; mem_obj = NULL; delete mem; delete hidden_mem_obj; hidden_mem_obj = NULL; } void StoreEntry::hideMemObject() { debugs(20, 3, HERE << "hiding " << mem_obj); assert(mem_obj); assert(!hidden_mem_obj); hidden_mem_obj = mem_obj; mem_obj = NULL; === modified file 'src/store_client.cc' --- src/store_client.cc 2011-02-15 04:02:28 +0000 +++ src/store_client.cc 2011-11-10 16:47:48 +0000 @@ -245,40 +245,45 @@ #endif /* range requests will skip into the body */ cmp_offset = copyRequest.offset; _callback = Callback (callback_fn, cbdataReference(data)); copyInto.data = copyRequest.data; copyInto.length = copyRequest.length; copyInto.offset = copyRequest.offset; static bool copying (false); assert (!copying); copying = true; PROF_start(storeClient_kickReads); /* we might be blocking comm reads due to readahead limits * now we have a new offset, trigger those reads... */ entry->mem_obj->kickReads(); PROF_stop(storeClient_kickReads); copying = false; storeClientCopy2(entry, this); + +#if USE_ADAPTATION + if (entry) + entry->kickProducer(); +#endif } /* * This function is used below to decide if we have any more data to * send to the client. If the store_status is STORE_PENDING, then we * do have more data to send. If its STORE_OK, then * we continue checking. If the object length is negative, then we * don't know the real length and must open the swap file to find out. * If the length is >= 0, then we compare it to the requested copy * offset. */ static int storeClientNoMoreToSend(StoreEntry * e, store_client * sc) { int64_t len; if (e->store_status == STORE_PENDING) return 0; if ((len = e->objectLen()) < 0) @@ -709,40 +714,44 @@ if (sc->_callback.pending()) { /* callback with ssize = -1 to indicate unexpected termination */ debugs(90, 3, "storeUnregister: store_client for " << mem->url << " has a callback"); sc->fail(); } #if STORE_CLIENT_LIST_DEBUG cbdataReferenceDone(sc->owner); #endif delete sc; assert(e->lock_count > 0); if (mem->nclients == 0) CheckQuickAbort(e); else mem->kickReads(); +#if USE_ADAPTATION + e->kickProducer(); +#endif + return 1; } /* Call handlers waiting for data to be appended to E. */ void StoreEntry::invokeHandlers() { /* Commit what we can to disk, if appropriate */ swapOut(); int i = 0; store_client *sc; dlink_node *nx = NULL; dlink_node *node; PROF_start(InvokeHandlers); debugs(90, 3, "InvokeHandlers: " << getMD5Text() ); /* walk the entire list looking for valid callbacks */ for (node = mem_obj->clients.head; node; node = nx) {