diff -NaurbB squid-3.1.0.13-20090815.orig/src/Server.cc squid-3.1.0.13-20090815.new/src/Server.cc --- squid-3.1.0.13-20090815.orig/src/Server.cc 2009-08-18 08:38:44.000000000 +0200 +++ squid-3.1.0.13-20090815.new/src/Server.cc 2009-08-18 09:24:07.000000000 +0200 @@ -44,6 +44,7 @@ #if USE_ADAPTATION #include "adaptation/AccessCheck.h" #include "adaptation/Iterator.h" +#include "base/AsyncCall.h" #endif // implemented in client_side_reply.cc until sides have a common parent @@ -56,6 +57,8 @@ , adaptationAccessCheckPending(false) , startedAdaptation(false) #endif + , theVirginReply(NULL) + , theFinalReply(NULL) { fwd = theFwdState; entry = fwd->entry; @@ -274,7 +277,9 @@ handleMoreAdaptedBodyAvailable(); return; } + else #endif + if(requestBodySource == bp) handleMoreRequestBodyAvailable(); } @@ -287,7 +292,9 @@ handleAdaptedBodyProductionEnded(); return; } + else #endif + if(requestBodySource == bp) handleRequestBodyProductionEnded(); } @@ -300,7 +307,9 @@ handleAdaptedBodyProducerAborted(); return; } + else #endif + if(requestBodySource == bp) handleRequestBodyProducerAborted(); } @@ -662,13 +671,56 @@ handleAdaptationAborted(!final); } +void +ServerStateData::handleKickProducerReads() +{ + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { + abortTransaction("store entry aborted while kick producer callback"); + return; + } + else if(!entry->isAccepting()){ + return; + } + if(!adaptedBodySource){ + return; + } + handleMoreAdaptedBodyAvailable(); +} + + // more adapted response body is available void ServerStateData::handleMoreAdaptedBodyAvailable() { - const size_t contentSize = adaptedBodySource->buf().contentSize(); + size_t contentSize = adaptedBodySource->buf().contentSize(); + bool consumedPartially = false; - debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " << + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { + abortTransaction("store entry aborted while reading adapted body"); + return; + } + + if(!contentSize) return; + + const size_t contentSizeWanted=entry->bytesWanted(Range(0,contentSize)); + if(!contentSizeWanted && contentSize>1){ + debugs(11,5, HERE << "NOT storing " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize() << ". Not enough buffer avail: " << contentSizeWanted); + + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(93,3, "ServerStateData::handleMoreAdaptedBodyAvailableCB", + Dialer(this, &ServerStateData::handleKickProducerReads)); + entry->setDeferredProducerCall(call); + return; + } + else if(contentSizeWanted+1consumedSize() << " due to MemObject buffer size"); + contentSize=contentSizeWanted+1; + consumedPartially=true;; + } + + debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " << "response body at offset " << adaptedBodySource->consumedSize()); if (abortOnBadEntry("entry refuses adapted body")) @@ -676,17 +728,26 @@ assert(entry); BodyPipeCheckout bpc(*adaptedBodySource); - const StoreIOBuffer ioBuf(&bpc.buf, currentOffset); - currentOffset += bpc.buf.size; + const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize); + //currentOffset += bpc.buf.size; + currentOffset += contentSize; entry->write(ioBuf); bpc.buf.consume(contentSize); bpc.checkIn(); + + if(!consumedPartially && (adaptedBodySource!=NULL) && adaptedBodySource->productionEnded()) + handleAdaptedBodyProductionEnded(); } // the entire adapted response body was produced, successfully void ServerStateData::handleAdaptedBodyProductionEnded() { + if(adaptedBodySource->buf().contentSize()){ + // not yet consumed everything. postpone ending consumption until we've consumed everything + return; + } + stopConsumingFrom(adaptedBodySource); if (abortOnBadEntry("entry went bad while waiting for adapted body eof")) diff -NaurbB squid-3.1.0.13-20090815.orig/src/Server.h squid-3.1.0.13-20090815.new/src/Server.h --- squid-3.1.0.13-20090815.orig/src/Server.h 2009-08-18 08:38:44.000000000 +0200 +++ squid-3.1.0.13-20090815.new/src/Server.h 2009-08-18 09:21:09.000000000 +0200 @@ -153,6 +153,8 @@ void handleAdaptationCompleted(); void handleAdaptationAborted(bool bypassable = false); + + void handleKickProducerReads(); #endif protected: diff -NaurbB squid-3.1.0.13-20090815.orig/src/store.cc squid-3.1.0.13-20090815.new/src/store.cc --- squid-3.1.0.13-20090815.orig/src/store.cc 2009-08-18 08:38:44.000000000 +0200 +++ squid-3.1.0.13-20090815.new/src/store.cc 2009-08-18 09:21:47.000000000 +0200 @@ -349,6 +349,10 @@ debugs(20, 3, HERE << "new StoreEntry " << this); mem_obj = NULL; +#if USE_ADAPTATION + deferredProducerCall=NULL; +#endif + expires = lastmod = lastref = timestamp = -1; swap_filen = -1; @@ -366,6 +370,21 @@ swap_dirn = -1; } +#if USE_ADAPTATION +void StoreEntry::setDeferredProducerCall(AsyncCall::Pointer call) +{ + deferredProducerCall=call; +} + +void StoreEntry::kickProducerReads() +{ + if(deferredProducerCall!=NULL){ + ScheduleCallHere(deferredProducerCall); + deferredProducerCall=NULL; + } +} +#endif + void StoreEntry::destroyMemObject() { diff -NaurbB squid-3.1.0.13-20090815.orig/src/store_client.cc squid-3.1.0.13-20090815.new/src/store_client.cc --- squid-3.1.0.13-20090815.orig/src/store_client.cc 2009-08-18 08:38:44.000000000 +0200 +++ squid-3.1.0.13-20090815.new/src/store_client.cc 2009-08-18 09:21:09.000000000 +0200 @@ -262,6 +262,10 @@ copying = false; storeClientCopy2(entry, this); + +#if USE_ADAPTATION + if(entry) entry->kickProducerReads(); +#endif } /* @@ -694,6 +698,10 @@ else mem->kickReads(); +#if USE_ADAPTATION + if(e) e->kickProducerReads(); +#endif + return 1; } diff -NaurbB squid-3.1.0.13-20090815.orig/src/Store.h squid-3.1.0.13-20090815.new/src/Store.h --- squid-3.1.0.13-20090815.orig/src/Store.h 2009-08-18 08:38:44.000000000 +0200 +++ squid-3.1.0.13-20090815.new/src/Store.h 2009-08-18 09:21:09.000000000 +0200 @@ -185,6 +185,14 @@ virtual void lock(); virtual void release(); +#if USE_ADAPTATION + void setDeferredProducerCall(AsyncCall::Pointer); + void kickProducerReads(); + bool isDeferredProducerCallSet(){return deferredProducerCall!=NULL; } +protected: + AsyncCall::Pointer deferredProducerCall; +#endif + private: static MemAllocator *pool; diff -NaurbB squid-3.1.0.13-20090815.orig/src/StoreIOBuffer.h squid-3.1.0.13-20090815.new/src/StoreIOBuffer.h --- squid-3.1.0.13-20090815.orig/src/StoreIOBuffer.h 2009-08-18 08:38:44.000000000 +0200 +++ squid-3.1.0.13-20090815.new/src/StoreIOBuffer.h 2009-08-18 09:21:09.000000000 +0200 @@ -59,6 +59,13 @@ flags.error = 0; } + StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) : + length(anLength), + offset (anOffset), + data(aMemBuf->content()) { + flags.error = 0; + } + Range range() const { return Range(offset, offset + length); }