Send chunked responses if body size is unknown. Apply HTTP chunked transfer encoding to the response body if all of the following conditions are met: * client claims HTTP version 1.1 or later support * response does not have a Content-Length header already * response does not use multipart/byteranges encoding * connection is persistent If we decide to send chunked reply, chunked_reply flag is set. Chunked encoding is done in ClientSocketContext::packChunk(). The last-chunk is sent only when clientReplyContext complete flag is set. === modified file 'compat/types.h' --- compat/types.h 2010-07-09 13:23:03 +0000 +++ compat/types.h 2010-08-19 18:25:20 +0000 @@ -99,37 +99,47 @@ #ifndef PRId64 #ifdef _SQUID_MSWIN_ /* Windows native port using MSVCRT */ #define PRId64 "I64d" #elif SIZEOF_INT64_T > SIZEOF_LONG #define PRId64 "lld" #else #define PRId64 "ld" #endif #endif #ifndef PRIu64 #ifdef _SQUID_MSWIN_ /* Windows native port using MSVCRT */ #define PRIu64 "I64u" #elif SIZEOF_INT64_T > SIZEOF_LONG #define PRIu64 "llu" #else #define PRIu64 "lu" #endif #endif +#ifndef PRIX64 +#ifdef _SQUID_MSWIN_ /* Windows native port using MSVCRT */ +#define PRIX64 "I64X" +#elif SIZEOF_INT64_T > SIZEOF_LONG +#define PRIX64 "llX" +#else +#define PRIX64 "lX" +#endif +#endif + #ifndef HAVE_MODE_T typedef unsigned short mode_t; #endif #ifndef HAVE_FD_MASK typedef unsigned long fd_mask; #endif #ifndef HAVE_SOCKLEN_T typedef int socklen_t; #endif #ifndef HAVE_MTYP_T typedef long mtyp_t; #endif #endif /* SQUID_TYPES_H */ === modified file 'src/client_side.cc' --- src/client_side.cc 2010-08-18 23:43:22 +0000 +++ src/client_side.cc 2010-08-20 03:50:08 +0000 @@ -875,62 +875,81 @@ ClientSocketContext::noteSentBodyBytes(s assert (http->range_iter.debt() >= -1); } bool ClientHttpRequest::multipartRangeRequest() const { return request->multipartRangeRequest(); } bool ClientSocketContext::multipartRangeRequest() const { return http->multipartRangeRequest(); } void ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData) { assert(rep == NULL); - if (!multipartRangeRequest()) { + if (!multipartRangeRequest() && !http->request->flags.chunked_reply) { size_t length = lengthToSend(bodyData.range()); noteSentBodyBytes (length); AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete", CommIoCbPtrFun(clientWriteBodyComplete, this)); comm_write(fd(), bodyData.data, length, call ); return; } MemBuf mb; mb.init(); - packRange(bodyData, &mb); + if (multipartRangeRequest()) + packRange(bodyData, &mb); + else + packChunk(bodyData, mb); if (mb.contentSize()) { /* write */ AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); comm_write_mbuf(fd(), &mb, call); } else writeComplete(fd(), NULL, 0, COMM_OK); } +/** + * Packs bodyData into mb using chunked encoding. Packs the last-chunk + * if bodyData is empty. + */ +void +ClientSocketContext::packChunk(const StoreIOBuffer &bodyData, MemBuf &mb) +{ + const uint64_t length = + static_cast(lengthToSend(bodyData.range())); + noteSentBodyBytes(length); + + mb.Printf("%"PRIX64"\r\n", length); + mb.append(bodyData.data, length); + mb.Printf("\r\n"); +} + /** put terminating boundary for multiparts */ static void clientPackTermBound(String boundary, MemBuf * mb) { mb->Printf("\r\n--" SQUIDSTRINGPH "--\r\n", SQUIDSTRINGPRINT(boundary)); debugs(33, 6, "clientPackTermBound: buf offset: " << mb->size); } /** appends a "part" HTTP header (as in a multi-part/range reply) to the buffer */ static void clientPackRangeHdr(const HttpReply * rep, const HttpHdrRangeSpec * spec, String boundary, MemBuf * mb) { HttpHeader hdr(hoReply); Packer p; assert(rep); assert(spec); /* put boundary */ debugs(33, 5, "clientPackRangeHdr: appending boundary: " << boundary); /* rfc2046 requires to _prepend_ boundary with ! */ @@ -1265,47 +1284,49 @@ ClientSocketContext::prepareReply(HttpRe reply = rep; if (http->request->range) buildRangeHeader(rep); } void ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData) { prepareReply(rep); assert (rep); MemBuf *mb = rep->pack(); /* Save length of headers for persistent conn checks */ http->out.headers_sz = mb->contentSize(); #if HEADERS_LOG headersLog(0, 0, http->request->method, rep); #endif if (bodyData.data && bodyData.length) { - if (!multipartRangeRequest()) { + if (multipartRangeRequest()) + packRange(bodyData, mb); + else if (http->request->flags.chunked_reply) { + packChunk(bodyData, *mb); + } else { size_t length = lengthToSend(bodyData.range()); noteSentBodyBytes (length); mb->append(bodyData.data, length); - } else { - packRange(bodyData, mb); } } /* write */ debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete"); AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); comm_write_mbuf(fd(), mb, call); delete mb; } /** * Write a chunk of data to a client socket. If the reply is present, * send the reply headers down the wire too, and clean them up when * finished. * Pre-condition: * The request is one backed by a connection, not an internal request. * data context is not NULL * There are no more entries in the stream chain. @@ -1320,41 +1341,45 @@ clientSocketRecipient(clientStreamNode * PROF_start(clientSocketRecipient); /* TODO: handle this rather than asserting * - it should only ever happen if we cause an abort and * the callback chain loops back to here, so we can simply return. * However, that itself shouldn't happen, so it stays as an assert for now. */ assert(cbdataReferenceValid(node)); assert(node->node.next == NULL); ClientSocketContext::Pointer context = dynamic_cast(node->data.getRaw()); assert(context != NULL); assert(connIsUsable(http->getConn())); fd = http->getConn()->fd; /* TODO: check offset is what we asked for */ if (context != http->getConn()->getCurrentContext()) { context->deferRecipientForLater(node, rep, receivedData); PROF_stop(clientSocketRecipient); return; } - if (responseFinishedOrFailed(rep, receivedData)) { + // After sending Transfer-Encoding: chunked (at least), always send + // the last-chunk if there was no error, ignoring responseFinishedOrFailed. + const bool mustSendLastChunk = http->request->flags.chunked_reply && + !http->request->flags.stream_error && !context->startOfOutput(); + if (responseFinishedOrFailed(rep, receivedData) && !mustSendLastChunk) { context->writeComplete(fd, NULL, 0, COMM_OK); PROF_stop(clientSocketRecipient); return; } if (!context->startOfOutput()) context->sendBody(rep, receivedData); else { assert(rep); http->al.reply = HTTPMSGLOCK(rep); context->sendStartOfMessage(rep, receivedData); } PROF_stop(clientSocketRecipient); } /** * Called when a downstream node is no longer interested in * our data. As we are a terminal node, this means on aborts * only === modified file 'src/client_side.h' --- src/client_side.h 2010-07-30 20:11:20 +0000 +++ src/client_side.h 2010-08-19 18:26:35 +0000 @@ -98,40 +98,41 @@ public: bool canPackMoreRanges() const; clientStream_status_t socketState(); void sendBody(HttpReply * rep, StoreIOBuffer bodyData); void sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData); size_t lengthToSend(Range const &available); void noteSentBodyBytes(size_t); void buildRangeHeader(HttpReply * rep); int fd() const; clientStreamNode * getTail() const; clientStreamNode * getClientReplyContext() const; void connIsFinished(); void removeFromConnectionList(ConnStateData * conn); void deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData); bool multipartRangeRequest() const; void registerWithConn(); void noteIoError(const int xerrno); ///< update state to reflect I/O error private: CBDATA_CLASS(ClientSocketContext); void prepareReply(HttpReply * rep); + void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb); void packRange(StoreIOBuffer const &, MemBuf * mb); void deRegisterWithConn(); void doClose(); void initiateClose(const char *reason); bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */ bool connRegistered_; }; class ConnectionDetail; /** A connection to a socket */ class ConnStateData : public BodyProducer/*, public RefCountable*/ { public: ConnStateData(); ~ConnStateData(); === modified file 'src/client_side_reply.cc' --- src/client_side_reply.cc 2010-08-14 02:58:39 +0000 +++ src/client_side_reply.cc 2010-08-20 04:12:18 +0000 @@ -958,40 +958,45 @@ clientReplyContext::traceReply(clientStr http->storeEntry()->complete(); } #define SENDING_BODY 0 #define SENDING_HDRSONLY 1 int clientReplyContext::checkTransferDone() { StoreEntry *entry = http->storeEntry(); if (entry == NULL) return 0; /* * For now, 'done_copying' is used for special cases like * Range and HEAD requests. */ if (http->flags.done_copying) return 1; + if (http->request->flags.chunked_reply && !flags.complete) { + // last-chunk was not sent + return 0; + } + /* * Handle STORE_OK objects. * objectLen(entry) will be set proprely. * RC: Does objectLen(entry) include the Headers? * RC: Yes. */ if (entry->store_status == STORE_OK) { return storeOKTransferDone(); } else { return storeNotOKTransferDone(); } } int clientReplyContext::storeOKTransferDone() const { if (http->out.offset >= http->storeEntry()->objectLen() - headers_sz) { debugs(88,3,HERE << "storeOKTransferDone " << " out.offset=" << http->out.offset << " objectLen()=" << http->storeEntry()->objectLen() << @@ -1102,51 +1107,53 @@ clientReplyContext::replyStatus() /* Here because lower nodes don't need it */ if (http->storeEntry() == NULL) { debugs(88, 5, "clientReplyStatus: no storeEntry"); return STREAM_FAILED; /* yuck, but what can we do? */ } if (EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) { /* TODO: Could upstream read errors (result.flags.error) be * lost, and result in undersize requests being considered * complete. Should we tcp reset such connections ? */ debugs(88, 5, "clientReplyStatus: aborted storeEntry"); return STREAM_FAILED; } if ((done = checkTransferDone()) != 0 || flags.complete) { debugs(88, 5, "clientReplyStatus: transfer is DONE"); /* Ok we're finished, but how? */ - if (http->storeEntry()->getReply()->bodySize(http->request->method) < 0) { + const int64_t expectedBodySize = + http->storeEntry()->getReply()->bodySize(http->request->method); + if (!http->request->flags.proxy_keepalive && expectedBodySize < 0) { debugs(88, 5, "clientReplyStatus: closing, content_length < 0"); return STREAM_FAILED; } if (!done) { debugs(88, 5, "clientReplyStatus: closing, !done, but read 0 bytes"); return STREAM_FAILED; } - if (!http->gotEnough()) { + if (expectedBodySize >= 0 && !http->gotEnough()) { debugs(88, 5, "clientReplyStatus: client didn't get all it expected"); return STREAM_UNPLANNED_COMPLETE; } if (http->request->flags.proxy_keepalive) { debugs(88, 5, "clientReplyStatus: stream complete and can keepalive"); return STREAM_COMPLETE; } debugs(88, 5, "clientReplyStatus: stream was not expected to complete!"); return STREAM_UNPLANNED_COMPLETE; } // XXX: Should this be checked earlier? We could return above w/o checking. if (reply->receivedBodyTooLarge(*http->request, http->out.offset - 4096)) { /* 4096 is a margin for the HTTP headers included in out.offset */ debugs(88, 5, "clientReplyStatus: client reply body is too large"); return STREAM_FAILED; } @@ -1347,61 +1354,72 @@ clientReplyContext::buildReplyHeader() * depends on authenticate behaviour: all schemes to date send no extra * data on 407/401 responses, and do not check the accel state on 401/407 * responses */ authenticateFixHeader(reply, request->auth_user_request, request, 0, 1); } else if (request->auth_user_request != NULL) authenticateFixHeader(reply, request->auth_user_request, request, http->flags.accel, 0); /* Append X-Cache */ httpHeaderPutStrf(hdr, HDR_X_CACHE, "%s from %s", is_hit ? "HIT" : "MISS", getMyHostname()); #if USE_CACHE_DIGESTS /* Append X-Cache-Lookup: -- temporary hack, to be removed @?@ @?@ */ httpHeaderPutStrf(hdr, HDR_X_CACHE_LOOKUP, "%s from %s:%d", lookup_type ? lookup_type : "NONE", getMyHostname(), getMyPort()); #endif + const bool maySendChunkedReply = !request->multipartRangeRequest() && + (request->http_ver.major >= 1) && (request->http_ver.minor >= 1); + /* Check whether we should send keep-alive */ if (!Config.onoff.error_pconns && reply->sline.status >= 400 && !request->flags.must_keepalive) { debugs(33, 3, "clientBuildReplyHeader: Error, don't keep-alive"); request->flags.proxy_keepalive = 0; } else if (!Config.onoff.client_pconns && !request->flags.must_keepalive) { debugs(33, 2, "clientBuildReplyHeader: Connection Keep-Alive not requested by admin or client"); request->flags.proxy_keepalive = 0; } else if (request->flags.proxy_keepalive && shutting_down) { debugs(88, 3, "clientBuildReplyHeader: Shutting down, don't keep-alive."); request->flags.proxy_keepalive = 0; } else if (request->flags.connection_auth && !reply->keep_alive) { debugs(33, 2, "clientBuildReplyHeader: Connection oriented auth but server side non-persistent"); request->flags.proxy_keepalive = 0; - } else if (reply->bodySize(request->method) < 0) { + } else if (reply->bodySize(request->method) < 0 && !maySendChunkedReply) { debugs(88, 3, "clientBuildReplyHeader: can't keep-alive, unknown body size" ); request->flags.proxy_keepalive = 0; } else if (fdUsageHigh()&& !request->flags.must_keepalive) { debugs(88, 3, "clientBuildReplyHeader: Not many unused FDs, can't keep-alive"); request->flags.proxy_keepalive = 0; } + // Decide if we send chunked reply + if (maySendChunkedReply && + request->flags.proxy_keepalive && + reply->bodySize(request->method) < 0) { + debugs(88, 3, "clientBuildReplyHeader: chunked reply"); + request->flags.chunked_reply = 1; + hdr->putStr(HDR_TRANSFER_ENCODING, "chunked"); + } /* Append VIA */ if (Config.onoff.via) { LOCAL_ARRAY(char, bbuf, MAX_URL + 32); String strVia; hdr->getList(HDR_VIA, &strVia); snprintf(bbuf, MAX_URL + 32, "%d.%d %s", reply->sline.version.major, reply->sline.version.minor, ThisCache); strListAdd(&strVia, bbuf, ','); hdr->delById(HDR_VIA); hdr->putStr(HDR_VIA, strVia.termedBuf()); } /* Signal keep-alive or close explicitly */ hdr->putStr(HDR_CONNECTION, request->flags.proxy_keepalive ? "keep-alive" : "close"); #if ADD_X_REQUEST_URI /* * Knowing the URI of the request is useful when debugging persistent @@ -1710,40 +1728,41 @@ clientReplyContext::makeThisHead() bool clientReplyContext::errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const { return /* aborted request */ (http->storeEntry() && EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) || /* Upstream read error */ (result.flags.error) || /* Upstream EOF */ (sizeToProcess == 0); } void clientReplyContext::sendStreamError(StoreIOBuffer const &result) { /** call clientWriteComplete so the client socket gets closed * * We call into the stream, because we don't know that there is a * client socket! */ debugs(88, 5, "clientReplyContext::sendStreamError: A stream error has occured, marking as complete and sending no data."); StoreIOBuffer localTempBuffer; flags.complete = 1; + http->request->flags.stream_error = 1; localTempBuffer.flags.error = result.flags.error; clientStreamCallback((clientStreamNode*)http->client_stream.head->data, http, NULL, localTempBuffer); } void clientReplyContext::pushStreamData(StoreIOBuffer const &result, char *source) { StoreIOBuffer localTempBuffer; if (result.length == 0) { debugs(88, 5, "clientReplyContext::pushStreamData: marking request as complete due to 0 length store result"); flags.complete = 1; } assert(result.offset - headers_sz == next()->readBuffer.offset); localTempBuffer.offset = result.offset - headers_sz; localTempBuffer.length = result.length; if (localTempBuffer.length) === modified file 'src/structs.h' --- src/structs.h 2010-08-07 14:22:54 +0000 +++ src/structs.h 2010-08-20 03:43:41 +0000 @@ -986,78 +986,80 @@ struct _netdbEntry { int n_peers_alloc; int n_peers; }; struct _iostats { struct { int reads; int reads_deferred; int read_hist[16]; int writes; int write_hist[16]; } Http, Ftp, Gopher; }; struct request_flags { - request_flags(): range(0),nocache(0),ims(0),auth(0),cachable(0),hierarchical(0),loopdetect(0),proxy_keepalive(0),proxying(0),refresh(0),redirected(0),need_validation(0),accelerated(0),ignore_cc(0),intercepted(0),spoof_client_ip(0),internal(0),internalclient(0),must_keepalive(0),destinationIPLookedUp_(0) { + request_flags(): range(0),nocache(0),ims(0),auth(0),cachable(0),hierarchical(0),loopdetect(0),proxy_keepalive(0),proxying(0),refresh(0),redirected(0),need_validation(0),accelerated(0),ignore_cc(0),intercepted(0),spoof_client_ip(0),internal(0),internalclient(0),must_keepalive(0),chunked_reply(0),stream_error(0),destinationIPLookedUp_(0) { #if USE_HTTP_VIOLATIONS nocache_hack = 0; #endif #if FOLLOW_X_FORWARDED_FOR done_follow_x_forwarded_for = 0; #endif /* FOLLOW_X_FORWARDED_FOR */ } unsigned int range:1; unsigned int nocache:1; unsigned int ims:1; unsigned int auth:1; unsigned int cachable:1; unsigned int hierarchical:1; unsigned int loopdetect:1; unsigned int proxy_keepalive:1; unsigned int proxying: 1; /* this should be killed, also in httpstateflags */ unsigned int refresh:1; unsigned int redirected:1; unsigned int need_validation:1; #if USE_HTTP_VIOLATIONS unsigned int nocache_hack:1; /* for changing/ignoring no-cache requests */ #endif unsigned int accelerated:1; unsigned int ignore_cc:1; unsigned int intercepted:1; /**< transparently intercepted request */ unsigned int spoof_client_ip:1; /**< spoof client ip if possible */ unsigned int internal:1; unsigned int internalclient:1; unsigned int must_keepalive:1; unsigned int connection_auth:1; /** Request wants connection oriented auth */ unsigned int connection_auth_disabled:1; /** Connection oriented auth can not be supported */ unsigned int connection_proxy_auth:1; /** Request wants connection oriented auth */ unsigned int pinned:1; /* Request sent on a pinned connection */ unsigned int auth_sent:1; /* Authentication forwarded */ unsigned int no_direct:1; /* Deny direct forwarding unless overriden by always_direct. Used in accelerator mode */ + unsigned int chunked_reply:1; /**< Reply with chunked transfer encoding */ + unsigned int stream_error:1; /**< Whether stream error has occured */ // When adding new flags, please update cloneAdaptationImmune() as needed. bool resetTCP() const; void setResetTCP(); void clearResetTCP(); void destinationIPLookupCompleted(); bool destinationIPLookedUp() const; // returns a partial copy of the flags that includes only those flags // that are safe for a related (e.g., ICAP-adapted) request to inherit request_flags cloneAdaptationImmune() const; #if FOLLOW_X_FORWARDED_FOR unsigned int done_follow_x_forwarded_for; #endif /* FOLLOW_X_FORWARDED_FOR */ private: unsigned int reset_tcp:1; unsigned int destinationIPLookedUp_:1;