Bug 3420: Request body consumption races and !theConsumer exception. Also fixes endless waiting for HTTP client to send req body we no longer need. Before these changes, the client side used a single "closing" state to handle two different error conditions: 1. We stopped receiving request body because of some error. 2. We stopped sending response because of some error. When a "directional" error occurred, we try to keep the transaction going in the other direction (e.g., to give ICAP the entire request or to give HTTP client the entire response). However, because there was just one "closing" state, the code failed to correctly detect or process many corner cases, resulting in stuck transactions and !theConsumer assertions/exceptions due to races between enableAutoConsumption() and expectNoConsumption() calls. This patch replaces the "closing" state with two direction-specific "we stopped sending/receiving" flags. Now, when the response sending code is done, it now checks whether the receiving code stopped and closes the connection as needed. This is done both when we encounter a sending error (ClientSocketContext::initiateClose) and when we successfully sent the entire response to the client (ClientSocketContext::keepaliveNextRequest). Similarly, when the request body reading code is done, it now checks whether the receiving code stopped and closes the connection as needed. This is done both when we encounter a receiving error (ConnStateData::noteBodyConsumerAborted) and when we successfully receive the entire request body from the client (ClientSocketContext::writeComplete). TODO: This patch focuses on various error cases. We might still have problems when there is an early HTTP response and no errors of any kind. I marked the corresponding old code with an XXX. === modified file 'src/client_side.cc' --- src/client_side.cc 2011-10-12 02:15:19 +0000 +++ src/client_side.cc 2011-11-17 21:53:07 +0000 @@ -1373,56 +1373,77 @@ { debugs(33, 2, "ClientSocketContextPushDeferredIfNeeded: FD " << conn->fd << " Sending next"); /** If the client stream is waiting on a socket write to occur, then */ if (deferredRequest->flags.deferred) { /** NO data is allowed to have been sent. */ assert(deferredRequest->http->out.size == 0); /** defer now. */ clientSocketRecipient(deferredRequest->deferredparams.node, deferredRequest->http, deferredRequest->deferredparams.rep, deferredRequest->deferredparams.queuedBuffer); } /** otherwise, the request is still active in a callbacksomewhere, * and we are done */ } +/// called when we have successfully finished writing the response void ClientSocketContext::keepaliveNextRequest() { ConnStateData * conn = http->getConn(); bool do_next_read = false; debugs(33, 3, "ClientSocketContext::keepaliveNextRequest: FD " << conn->fd); connIsFinished(); if (conn->pinning.pinned && conn->pinning.fd == -1) { debugs(33, 2, "clientKeepaliveNextRequest: FD " << conn->fd << " Connection was pinned but server side gone. Terminating client connection"); comm_close(conn->fd); return; } /** \par + * We are done with the response, and we are either still receiving request + * body (early response!) or have already stopped receiving anything. + * + * If we are still receiving, then clientParseRequest() below will fail. + * (XXX: but then we will call readNextRequest() which may succeed and + * execute a smuggled request as we are not done with the current request). + * + * If we stopped because we got everything, then try the next request. + * + * If we stopped receiving because of an error, then close now to avoid + * getting stuck and to prevent accidental request smuggling. + */ + + if (const char *reason = conn->stoppedReceiving()) { + debugs(33, 3, HERE << "closing for earlier request error: " << reason); + comm_close(conn->fd); + return; + } + + /** \par * Attempt to parse a request from the request buffer. * If we've been fed a pipelined request it may already * be in our read buffer. * \par * This needs to fall through - if we're unlucky and parse the _last_ request * from our read buffer we may never re-register for another client read. */ if (clientParseRequest(conn, do_next_read)) { debugs(33, 3, "clientSocketContext::keepaliveNextRequest: FD " << conn->fd << ": parsed next request from buffer"); } /** \par * Either we need to kick-start another read or, if we have * a half-closed connection, kill it after the last request. * This saves waiting for half-closed connections to finished being * half-closed _AND_ then, sometimes, spending "Timeout" time in * the keepalive "Waiting for next request" state. */ @@ -1617,78 +1638,70 @@ return STREAM_NONE; } /** * A write has just completed to the client, or we have just realised there is * no more data to send. */ void clientWriteComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data) { ClientSocketContext *context = (ClientSocketContext *)data; context->writeComplete (fd, bufnotused, size, errflag); } void ClientSocketContext::doClose() { comm_close(fd()); } -/** Called to initiate (and possibly complete) closing of the context. - * The underlying socket may be already closed */ +/// called when we encounter a response-related error void ClientSocketContext::initiateClose(const char *reason) { - debugs(33, 5, HERE << "initiateClose: closing for " << reason); + http->getConn()->stopSending(reason); // closes ASAP +} - if (http != NULL) { - ConnStateData * conn = http->getConn(); +void +ConnStateData::stopSending(const char *error) +{ + debugs(33, 4, HERE << "sending error (FD " << fd << "): " << error << + "; old receiving error: " << + (stoppedReceiving() ? stoppedReceiving_ : "none")); - if (conn != NULL) { - if (const int64_t expecting = conn->bodySizeLeft()) { - debugs(33, 5, HERE << "ClientSocketContext::initiateClose: " << - "closing, but first " << conn << " needs to read " << - expecting << " request body bytes with " << - conn->in.notYetUsed << " notYetUsed"); - - if (conn->closing()) { - debugs(33, 2, HERE << "avoiding double-closing " << conn); - return; - } - - /* - * XXX We assume the reply fits in the TCP transmit - * window. If not the connection may stall while sending - * the reply (before reaching here) if the client does not - * try to read the response while sending the request body. - * As of yet we have not received any complaints indicating - * this may be an issue. - */ - conn->startClosing(reason); + if (const char *oldError = stoppedSending()) { + debugs(33, 3, HERE << "already stopped sending: " << oldError); + return; // nothing has changed as far as this connection is concerned + } - return; - } + stoppedSending_ = error; + + if (!stoppedReceiving()) { + if (const int64_t expecting = bodySizeLeft()) { + debugs(33, 5, HERE << "must still read " << expecting << + " request body bytes with " << in.notYetUsed << " unused"); + return; // wait for the request receiver to finish reading } } - doClose(); + comm_close(fd); } void ClientSocketContext::writeComplete(int aFileDescriptor, char *bufnotused, size_t size, comm_err_t errflag) { StoreEntry *entry = http->storeEntry(); http->out.size += size; assert(aFileDescriptor > -1); debugs(33, 5, "clientWriteComplete: FD " << aFileDescriptor << ", sz " << size << ", err " << errflag << ", off " << http->out.size << ", len " << entry ? entry->objectLen() : 0); clientUpdateSocketStats(http->logType, size); assert (this->fd() == aFileDescriptor); /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ if (errflag == COMM_ERR_CLOSING) return; if (errflag || clientHttpRequestStatus(aFileDescriptor, http)) { @@ -2911,75 +2924,73 @@ flags.readMoreRequests = false; return; // XXX: is that sufficient to generate an error? } } else // identity encoding #endif { debugs(33,5, HERE << "handling plain request body for FD " << fd); putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed); if (!bodyPipe->mayNeedMoreData()) { // BodyPipe will clear us automagically when we produced everything bodyPipe = NULL; } } if (putSize > 0) connNoteUseOfBuffer(this, putSize); if (!bodyPipe) { debugs(33,5, HERE << "produced entire request body for FD " << fd); - if (closing()) { + if (const char *reason = stoppedSending()) { /* we've finished reading like good clients, * now do the close that initiateClose initiated. - * - * XXX: do we have to close? why not check keepalive et. - * - * XXX: To support chunked requests safely, we need to handle - * the case of an endless request. This if-statement does not, - * because mayNeedMoreData is true if request size is not known. */ + debugs(33, 3, HERE << "closing for earlier sending error: " << reason); comm_close(fd); return false; } } return true; } void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer ) { if (!handleRequestBodyData()) return; // too late to read more body if (!isOpen() || closing()) return; readSomeData(); } void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer ) { - if (!closing()) - startClosing("body consumer aborted"); + // request reader may get stuck waiting for space if nobody consumes body + if (bodyPipe != NULL) + bodyPipe->enableAutoConsumption(); + + stopReceiving("virgin request body consumer aborted"); // closes ASAP } /** general lifetime handler for HTTP requests */ void ConnStateData::requestTimeout(const CommTimeoutCbParams &io) { #if THIS_CONFUSES_PERSISTENT_CONNECTION_AWARE_BROWSERS_AND_USERS debugs(33, 3, "requestTimeout: FD " << io.fd << ": lifetime is expired."); if (COMMIO_FD_WRITECB(io.fd)->active) { /* FIXME: If this code is reinstated, check the conn counters, * not the fd table state */ /* * Some data has been sent to the client, just close the FD */ comm_close(io.fd); } else if (nrequests) { /* * assume its a persistent connection; just close it @@ -3738,41 +3749,43 @@ /* * hack for ident ACL. It needs to get full addresses, and a place to store * the ident result on persistent connections... */ /* connection oriented auth also needs these two lines for it's operation. */ /* * Internal requests do not have a connection reference, because: A) their * byte count may be transformed before being applied to an outbound * connection B) they are internal - any limiting on them should be done on * the server end. */ if (conn != NULL) ch->conn(conn); /* unreferenced in FilledCheckList.cc */ return ch; } CBDATA_CLASS_INIT(ConnStateData); -ConnStateData::ConnStateData() :AsyncJob("ConnStateData"), transparent_ (false), closing_ (false), switchedToHttps_(false) +ConnStateData::ConnStateData() :AsyncJob("ConnStateData"), transparent_ (false), + switchedToHttps_(false), + stoppedSending_(NULL), stoppedReceiving_(NULL) { pinning.fd = -1; pinning.pinned = false; pinning.auth = false; } bool ConnStateData::transparent() const { return transparent_; } void ConnStateData::transparent(bool const anInt) { transparent_ = anInt; } bool ConnStateData::reading() const @@ -3781,67 +3794,58 @@ } void ConnStateData::stopReading() { if (reading()) { comm_read_cancel(fd, reader); reader = NULL; } } BodyPipe::Pointer ConnStateData::expectRequestBody(int64_t size) { bodyPipe = new BodyPipe(this); bodyPipe->setBodySize(size); return bodyPipe; } -bool -ConnStateData::closing() const -{ - return closing_; -} - -/** - * Called by ClientSocketContext to give the connection a chance to read - * the entire body before closing the socket. - */ void -ConnStateData::startClosing(const char *reason) +ConnStateData::stopReceiving(const char *error) { - debugs(33, 5, HERE << "startClosing " << this << " for " << reason); - assert(!closing()); - closing_ = true; + debugs(33, 4, HERE << "receiving error (FD " << fd << "): " << error << + "; old sending error: " << + (stoppedSending() ? stoppedSending_ : "none")); - assert(bodyPipe != NULL); - assert(bodySizeLeft() > 0); + if (const char *oldError = stoppedReceiving()) { + debugs(33, 3, HERE << "already stopped receiving: " << oldError); + return; // nothing has changed as far as this connection is concerned + } - // We do not have to abort the body pipeline because we are going to - // read the entire body anyway. - // Perhaps an ICAP server wants to log the complete request. - - // If a consumer abort have caused this closing, we may get stuck - // as nobody is consuming our data. Allow auto-consumption. - bodyPipe->enableAutoConsumption(); + stoppedReceiving_ = error; + + if (const char *sendError = stoppedSending()) { + debugs(33, 3, HERE << "closing because also stopped sending: " << sendError); + comm_close(fd); + } } void ConnStateData::expectNoForwarding() { if (bodyPipe != NULL) { debugs(33, 4, HERE << "no consumer for virgin body " << bodyPipe->status()); bodyPipe->expectNoConsumption(); } } // initialize dechunking state void ConnStateData::startDechunkingRequest(HttpParser *hp) { debugs(33, 5, HERE << "start dechunking at " << HttpParserRequestLen(hp)); assert(in.dechunkingState == chunkUnknown); assert(!in.bodyParser); in.bodyParser = new ChunkedCodingParser; in.chunkedSeen = HttpParserRequestLen(hp); // skip headers when dechunking === modified file 'src/client_side.h' --- src/client_side.h 2011-10-11 02:04:19 +0000 +++ src/client_side.h 2011-11-17 21:54:44 +0000 @@ -197,42 +197,49 @@ bool readMoreRequests; bool swanSang; // XXX: temporary flag to check proper cleanup } flags; struct { int fd; /* pinned server side connection */ char *host; /* host name of pinned connection */ int port; /* port of pinned connection */ bool pinned; /* this connection was pinned */ bool auth; /* pinned for www authentication */ struct peer *peer; /* peer the connection goes via */ AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/ } pinning; http_port_list *port; bool transparent() const; void transparent(bool const); bool reading() const; void stopReading(); ///< cancels comm_read if it is scheduled - bool closing() const; - void startClosing(const char *reason); + /// true if we stopped receiving the request + const char *stoppedReceiving() const { return stoppedReceiving_; } + /// true if we stopped sending the response + const char *stoppedSending() const { return stoppedSending_; } + /// note request receiving error and close as soon as we write the response + void stopReceiving(const char *error); + /// note response sending error and close as soon as we read the request + void stopSending(const char *error); + void expectNoForwarding(); ///< cleans up virgin request [body] forwarding state BodyPipe::Pointer expectRequestBody(int64_t size); virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); void handleReadData(char *buf, size_t size); bool handleRequestBodyData(); /** * Correlate the current ConnStateData object with the pinning_fd socket descriptor. */ void pinConnection(int fd, HttpRequest *request, struct peer *peer, bool auth); /** * Decorrelate the ConnStateData object from its pinned peer */ void unpinConnection(); /** * Checks if there is pinning info if it is valid. It can close the server side connection * if pinned info is not valid. @@ -276,36 +283,41 @@ bool switchToHttps(const char *host); bool switchedToHttps() const { return switchedToHttps_; } #else bool switchedToHttps() const { return false; } #endif void startDechunkingRequest(HttpParser *hp); bool parseRequestChunks(HttpParser *hp); void finishDechunkingRequest(HttpParser *hp); void cleanDechunkingRequest(); private: int connReadWasError(comm_err_t flag, int size, int xerrno); int connFinishedWithConn(int size); void clientMaybeReadData(int do_next_read); void clientAfterReadingRequests(int do_next_read); private: CBDATA_CLASS2(ConnStateData); bool transparent_; - bool closing_; bool switchedToHttps_; + + /// the reason why we no longer write the response or nil + const char *stoppedSending_; + /// the reason why we no longer read the request or nil + const char *stoppedReceiving_; + String sslHostName; ///< Host name for SSL certificate generation AsyncCall::Pointer reader; ///< set when we are reading BodyPipe::Pointer bodyPipe; // set when we are reading request body }; /* convenience class while splitting up body handling */ /* temporary existence only - on stack use expected */ void setLogUri(ClientHttpRequest * http, char const *uri, bool cleanUrl = false); const char *findTrailingHTTPVersion(const char *uriAndHTTPVersion, const char *end = NULL); #endif /* SQUID_CLIENTSIDE_H */