author: Christos Tsantilas , Alex Rousskov Propagate pinned connection persistency and closures to the client. Squid was trying hard to forward a request after pinned connection failures because some of those failures were benign pconn races. That meant re-pinning failed connections. After a few iterations to correctly handle non-idempotent requests, the code appeared to work, but the final design, with all the added complexity and related dangers was deemed inferior to the approach we use now. Squid now simply propagates connection closures (including pconn races) to the client. It is now the client responsibility not to send non-idempotent requests on idle persistent connections and to recover from pconn races. Squid also propagates HTTP connection persistency indicators from client to server and back, to make client job feasible. Squid will send Connection:close and will close the client connection if the pinned server says so, even if Squid could still maintain a persistent connection with the client. These changes are not mean to affect regular (not pinned) transactions. In access.log, one can detect requests that were not responded to (due to race conditions on pinned connections) by searching for ERR_ZERO_SIZE_OBJECT %err_code with TCP_MISS/000 status and zero response bytes. This is a Measurement Factory project. === modified file 'src/RequestFlags.h' --- src/RequestFlags.h 2012-09-19 17:16:56 +0000 +++ src/RequestFlags.h 2013-01-16 15:07:38 +0000 @@ -88,42 +88,40 @@ /** set if the Host: header passed verification */ bool hostVerified :1; /** request to spoof the client ip */ bool spoofClientIp :1; /** set if the request is internal (\see ClientHttpRequest::flags.internal)*/ bool internal :1; /** set for internally-generated requests */ //XXX this is set in in clientBeginRequest, but never tested. bool internalClient :1; /** if set, request to try very hard to keep the connection alive */ bool mustKeepalive :1; /** set if the rquest wants connection oriented auth */ bool connectionAuth :1; /** set if connection oriented auth can not be supported */ bool connectionAuthDisabled :1; /** Request wants connection oriented auth */ // XXX This is set in clientCheckPinning but never tested bool connectionProxyAuth :1; /** set if the request was sent on a pinned connection */ bool pinned :1; - /** OK to reopen a failed pinned connection */ - bool canRePin :1; /** Authentication was already sent upstream (e.g. due tcp-level auth) */ bool authSent :1; /** Deny direct forwarding unless overriden by always_direct * Used in accelerator mode */ bool noDirect :1; /** Reply with chunked transfer encoding */ bool chunkedReply :1; /** set if stream error has occured */ bool streamError :1; /** internal ssl-bump request to get server cert */ bool sslPeek :1; /** set if X-Forwarded-For checking is complete * * do not read directly; use doneFollowXff for reading */ bool done_follow_x_forwarded_for :1; /** set for ssl-bumped requests */ bool sslBumped :1; bool destinationIpLookedUp:1; /** request to reset the TCP stream */ === modified file 'src/client_side.cc' --- src/client_side.cc 2012-12-28 15:23:21 +0000 +++ src/client_side.cc 2013-01-16 17:56:00 +0000 @@ -2652,41 +2652,40 @@ /* we should skip request line! */ /* XXX should actually know the damned buffer size here */ if (http_ver.major >= 1 && !request->parseHeader(HttpParserHdrBuf(hp), HttpParserHdrSz(hp))) { clientStreamNode *node = context->getClientReplyContext(); debugs(33, 5, "Failed to parse request headers:\n" << HttpParserHdrBuf(hp)); conn->quitAfterError(request); // setLogUri should called before repContext->setReplyToError setLogUri(http, http->uri, true); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, http->uri, conn->clientConnection->remote, NULL, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; } request->clientConnectionManager = conn; request->flags.accelerated = http->flags.accel; request->flags.sslBumped=conn->switchedToHttps(); - request->flags.canRePin = request->flags.sslBumped && conn->pinning.pinned; request->flags.ignoreCc = conn->port->ignore_cc; // TODO: decouple http->flags.accel from request->flags.sslBumped request->flags.noDirect = (request->flags.accelerated && !request->flags.sslBumped) ? !conn->port->allow_direct : 0; #if USE_AUTH if (request->flags.sslBumped) { if (conn->auth_user_request != NULL) request->auth_user_request = conn->auth_user_request; } #endif /** \par * If transparent or interception mode is working clone the transparent and interception flags * from the port settings to the request. */ if (http->clientConnection != NULL) { request->flags.intercepted = ((http->clientConnection->flags & COMM_INTERCEPTION) != 0); request->flags.spoofClientIp = ((http->clientConnection->flags & COMM_TRANSPARENT) != 0 ) ; } @@ -4256,42 +4255,46 @@ /* * hack for ident ACL. It needs to get full addresses, and a place to store * the ident result on persistent connections... */ /* connection oriented auth also needs these two lines for it's operation. */ return ch; } CBDATA_CLASS_INIT(ConnStateData); ConnStateData::ConnStateData() : AsyncJob("ConnStateData"), #if USE_SSL sslBumpMode(Ssl::bumpEnd), switchedToHttps_(false), sslServerBump(NULL), #endif stoppedSending_(NULL), stoppedReceiving_(NULL) { + pinning.host = NULL; + pinning.port = -1; pinning.pinned = false; pinning.auth = false; + pinning.zeroReply = false; + pinning.peer = NULL; } bool ConnStateData::transparent() const { return clientConnection != NULL && (clientConnection->flags & (COMM_TRANSPARENT|COMM_INTERCEPTION)); } bool ConnStateData::reading() const { return reader != NULL; } void ConnStateData::stopReading() { if (reading()) { comm_read_cancel(clientConnection->fd, reader); reader = NULL; @@ -4413,43 +4416,47 @@ debugs(33, 3, HERE << "ignoring 1xx due to earlier closure"); return; } ClientSocketContext::Pointer context = getCurrentContext(); if (context != NULL) { context->writeControlMsg(msg); // will call msg.cbSuccess return; } debugs(33, 3, HERE << " closing due to missing context for 1xx"); clientConnection->close(); } /// Our close handler called by Comm when the pinned connection is closed void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) { // FwdState might repin a failed connection sooner than this close // callback is called for the failed connection. - if (pinning.serverConnection == io.conn) { - pinning.closeHandler = NULL; // Comm unregisters handlers before calling - unpinConnection(); + assert(pinning.serverConnection == io.conn); + pinning.closeHandler = NULL; // Comm unregisters handlers before calling + const bool sawZeroReply = pinning.zeroReply; // reset when unpinning + unpinConnection(); + if (sawZeroReply) { + debugs(33, 3, "Closing client connection on pinned zero reply."); + clientConnection->close(); } } void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpRequest *request, CachePeer *aPeer, bool auth) { char desc[FD_DESC_SZ]; if (Comm::IsConnOpen(pinning.serverConnection)) { if (pinning.serverConnection->fd == pinServer->fd) return; } unpinConnection(); // closes pinned connection, if any, and resets fields pinning.serverConnection = pinServer; debugs(33, 3, HERE << pinning.serverConnection); // when pinning an SSL bumped connection, the request may be NULL @@ -4514,23 +4521,25 @@ void ConnStateData::unpinConnection() { debugs(33, 3, HERE << pinning.serverConnection); if (pinning.peer) cbdataReferenceDone(pinning.peer); if (Comm::IsConnOpen(pinning.serverConnection)) { if (pinning.closeHandler != NULL) { comm_remove_close_handler(pinning.serverConnection->fd, pinning.closeHandler); pinning.closeHandler = NULL; } /// also close the server side socket, we should not use it for any future requests... // TODO: do not close if called from our close handler? pinning.serverConnection->close(); } safe_free(pinning.host); + pinning.zeroReply = false; + /* NOTE: pinning.pinned should be kept. This combined with fd == -1 at the end of a request indicates that the host * connection has gone away */ } === modified file 'src/client_side.h' --- src/client_side.h 2012-11-04 12:27:49 +0000 +++ src/client_side.h 2013-01-16 15:07:38 +0000 @@ -247,40 +247,41 @@ /** * used by the owner of the connection, opaque otherwise * TODO: generalise the connection owner concept. */ ClientSocketContext::Pointer currentobject; Ip::Address log_addr; int nrequests; struct { bool readMore; ///< needs comm_read (for this request or new requests) bool swanSang; // XXX: temporary flag to check proper cleanup } flags; struct { Comm::ConnectionPointer serverConnection; /* pinned server side connection */ char *host; /* host name of pinned connection */ int port; /* port of pinned connection */ bool pinned; /* this connection was pinned */ bool auth; /* pinned for www authentication */ + bool zeroReply; ///< server closed w/o response (ERR_ZERO_SIZE_OBJECT) CachePeer *peer; /* CachePeer the connection goes via */ AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/ } pinning; AnyP::PortCfg *port; bool transparent() const; bool reading() const; void stopReading(); ///< cancels comm_read if it is scheduled /// true if we stopped receiving the request const char *stoppedReceiving() const { return stoppedReceiving_; } /// true if we stopped sending the response const char *stoppedSending() const { return stoppedSending_; } /// note request receiving error and close as soon as we write the response void stopReceiving(const char *error); /// note response sending error and close as soon as we read the request void stopSending(const char *error); void expectNoForwarding(); ///< cleans up virgin request [body] forwarding state === modified file 'src/client_side_reply.cc' --- src/client_side_reply.cc 2012-12-26 14:03:11 +0000 +++ src/client_side_reply.cc 2013-01-16 15:13:11 +0000 @@ -1462,40 +1462,44 @@ request->flags.proxyKeepalive = 0; } else if (!Config.onoff.client_pconns && !request->flags.mustKeepalive) { debugs(33, 2, "clientBuildReplyHeader: Connection Keep-Alive not requested by admin or client"); request->flags.proxyKeepalive = 0; } else if (request->flags.proxyKeepalive && shutting_down) { debugs(88, 3, "clientBuildReplyHeader: Shutting down, don't keep-alive."); request->flags.proxyKeepalive = 0; } else if (request->flags.connectionAuth && !reply->keep_alive) { debugs(33, 2, "clientBuildReplyHeader: Connection oriented auth but server side non-persistent"); request->flags.proxyKeepalive = 0; } else if (reply->bodySize(request->method) < 0 && !maySendChunkedReply) { debugs(88, 3, "clientBuildReplyHeader: can't keep-alive, unknown body size" ); request->flags.proxyKeepalive = 0; } else if (fdUsageHigh()&& !request->flags.mustKeepalive) { debugs(88, 3, "clientBuildReplyHeader: Not many unused FDs, can't keep-alive"); request->flags.proxyKeepalive = 0; } else if (request->flags.sslBumped && !reply->persistent()) { // We do not really have to close, but we pretend we are a tunnel. debugs(88, 3, "clientBuildReplyHeader: bumped reply forces close"); request->flags.proxyKeepalive = 0; + } else if (request->pinnedConnection() && !reply->persistent()) { + // The peer wants to close the pinned connection + debugs(88, 3, "pinned reply forces close"); + request->flags.proxyKeepalive = 0; } // Decide if we send chunked reply if (maySendChunkedReply && request->flags.proxyKeepalive && reply->bodySize(request->method) < 0) { debugs(88, 3, "clientBuildReplyHeader: chunked reply"); request->flags.chunkedReply = 1; hdr->putStr(HDR_TRANSFER_ENCODING, "chunked"); } /* Append VIA */ if (Config.onoff.via) { LOCAL_ARRAY(char, bbuf, MAX_URL + 32); String strVia; hdr->getList(HDR_VIA, &strVia); snprintf(bbuf, MAX_URL + 32, "%d.%d %s", reply->sline.version.major, reply->sline.version.minor, ThisCache); @@ -2060,40 +2064,44 @@ void clientReplyContext::sendMoreData (StoreIOBuffer result) { if (deleting) return; StoreEntry *entry = http->storeEntry(); ConnStateData * conn = http->getConn(); // too late, our conn is closing // TODO: should we also quit? if (conn == NULL) { debugs(33,3, "not sending more data to a closed connection" ); return; } if (!conn->isOpen()) { debugs(33,3, "not sending more data to closing connection " << conn->clientConnection); return; } + if (conn->pinning.zeroReply) { + debugs(33,3, "not sending to a closing on pinned zero reply " << conn->clientConnection); + return; + } char *buf = next()->readBuffer.data; if (buf != result.data) { /* we've got to copy some data */ assert(result.length <= next()->readBuffer.length); memcpy(buf, result.data, result.length); } if (reqofs==0 && !logTypeIsATcpHit(http->logType) && Comm::IsConnOpen(conn->clientConnection)) { if (Ip::Qos::TheConfig.isHitTosActive()) { Ip::Qos::doTosLocalMiss(conn->clientConnection, http->request->hier.code); } if (Ip::Qos::TheConfig.isHitNfmarkActive()) { Ip::Qos::doNfmarkLocalMiss(conn->clientConnection, http->request->hier.code); } } /* We've got the final data to start pushing... */ flags.storelogiccomplete = 1; === modified file 'src/forward.cc' --- src/forward.cc 2012-12-28 15:23:21 +0000 +++ src/forward.cc 2013-01-17 12:22:52 +0000 @@ -155,50 +155,56 @@ if (isIntercepted && useOriginalDst) { selectPeerForIntercepted(); // 3.2 does not suppro re-wrapping inside CONNECT. // our only alternative is to fake destination "found" and continue with the forwarding. startConnectionOrFail(); return; } #endif // do full route options selection peerSelect(&serverDestinations, request, entry, fwdPeerSelectionCompleteWrapper, this); } #if STRICT_ORIGINAL_DST /// bypasses peerSelect() when dealing with intercepted requests void FwdState::selectPeerForIntercepted() { // use pinned connection if available Comm::ConnectionPointer p; - if (ConnStateData *client = request->pinnedConnection()) + if (ConnStateData *client = request->pinnedConnection()) { p = client->validatePinnedConnection(request, NULL); + if (Comm::IsConnOpen(p)) { + /* duplicate peerSelectPinned() effects */ + p->peerType = PINNED; + entry->ping_status = PING_DONE; /* Skip ICP */ - if (Comm::IsConnOpen(p)) { - /* duplicate peerSelectPinned() effects */ - p->peerType = PINNED; - entry->ping_status = PING_DONE; /* Skip ICP */ - - debugs(17, 3, HERE << "reusing a pinned conn: " << *p); - serverDestinations.push_back(p); + debugs(17, 3, HERE << "reusing a pinned conn: " << *p); + serverDestinations.push_back(p); + } else { + debugs(17,2,HERE << "Pinned connection is not valid: " << p); + ErrorState *anErr = new ErrorState(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE, request); + fail(anErr); + } + // Either use the valid pinned connection or fail if it is invalid. + return; } // use client original destination as second preferred choice p = new Comm::Connection(); p->peerType = ORIGINAL_DST; p->remote = clientConn->local; getOutgoingAddress(request, p); debugs(17, 3, HERE << "using client original destination: " << *p); serverDestinations.push_back(p); } #endif void FwdState::completed() { if (flags.forward_completed == 1) { debugs(17, DBG_IMPORTANT, HERE << "FwdState::completed called on a completed request! Bad!"); return; } @@ -379,44 +385,53 @@ debugs(17, 3, HERE << "Connection failed: " << entry->url()); if (!err) { ErrorState *anErr = new ErrorState(ERR_CANNOT_FORWARD, HTTP_INTERNAL_SERVER_ERROR, request); fail(anErr); } // else use actual error from last connection attempt self = NULL; // refcounted } } void FwdState::fail(ErrorState * errorState) { debugs(17, 3, HERE << err_type_str[errorState->type] << " \"" << httpStatusString(errorState->httpStatus) << "\"\n\t" << entry->url() ); delete err; err = errorState; if (!errorState->request) errorState->request = HTTPMSGLOCK(request); - if (pconnRace == racePossible && err->type == ERR_ZERO_SIZE_OBJECT) { + if (err->type != ERR_ZERO_SIZE_OBJECT) + return; + + if (pconnRace == racePossible) { debugs(17, 5, HERE << "pconn race happened"); pconnRace = raceHappened; } + + if (ConnStateData *pinned_connection = request->pinnedConnection()) { + pinned_connection->pinning.zeroReply = true; + flags.dont_retry = true; // we want to propagate failure to the client + debugs(17, 4, "zero reply on pinned connection"); + } } /** * Frees fwdState without closing FD or generating an abort */ void FwdState::unregister(Comm::ConnectionPointer &conn) { debugs(17, 3, HERE << entry->url() ); assert(serverConnection() == conn); assert(Comm::IsConnOpen(conn)); comm_remove_close_handler(conn->fd, fwdServerClosedWrapper, this); serverConn = NULL; } // Legacy method to be removed in favor of the above as soon as possible void FwdState::unregister(int fd) { debugs(17, 3, HERE << entry->url() ); @@ -997,52 +1012,42 @@ if (conn != NULL) { if (conn->getPeer()) peerConnectFailed(conn->getPeer()); conn->close(); } retryOrBail(); return; } serverConn = conn; flags.connected_okay = true; debugs(17, 3, HERE << serverConnection() << ": '" << entry->url() << "'" ); comm_add_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this); if (serverConnection()->getPeer()) peerConnectSucceded(serverConnection()->getPeer()); - // some requests benefit from pinning but do not require it and can "repin" - const bool rePin = request->flags.canRePin && - request->clientConnectionManager.valid(); - if (rePin) { - debugs(17, 3, HERE << "repinning " << serverConn); - request->clientConnectionManager->pinConnection(serverConn, - request, serverConn->getPeer(), request->flags.auth); - request->flags.pinned = 1; - } - #if USE_SSL - if (!request->flags.pinned || rePin) { + if (!request->flags.pinned) { if ((serverConnection()->getPeer() && serverConnection()->getPeer()->use_ssl) || (!serverConnection()->getPeer() && request->protocol == AnyP::PROTO_HTTPS) || request->flags.sslPeek) { initiateSSL(); return; } } #endif dispatch(); } void FwdState::connectTimeout(int fd) { debugs(17, 2, "fwdConnectTimeout: FD " << fd << ": '" << entry->url() << "'" ); assert(serverDestinations[0] != NULL); assert(fd == serverDestinations[0]->fd); if (entry->isEmpty()) { @@ -1088,71 +1093,68 @@ int ftimeout = Config.Timeout.forward - (squid_curtime - start_t); if (ftimeout < 0) ftimeout = 5; if (ftimeout < ctimeout) ctimeout = ftimeout; if (serverDestinations[0]->getPeer() && request->flags.sslBumped) { debugs(50, 4, "fwdConnectStart: Ssl bumped connections through parrent proxy are not allowed"); ErrorState *anErr = new ErrorState(ERR_CANNOT_FORWARD, HTTP_SERVICE_UNAVAILABLE, request); fail(anErr); self = NULL; // refcounted return; } request->flags.pinned = 0; // XXX: what if the ConnStateData set this to flag existing credentials? // XXX: answer: the peer selection *should* catch it and give us only the pinned peer. so we reverse the =0 step below. // XXX: also, logs will now lie if pinning is broken and leads to an error message. if (serverDestinations[0]->peerType == PINNED) { ConnStateData *pinned_connection = request->pinnedConnection(); + debugs(17,7, "pinned peer connection: " << pinned_connection); // pinned_connection may become nil after a pconn race if (pinned_connection) serverConn = pinned_connection->validatePinnedConnection(request, serverDestinations[0]->getPeer()); else serverConn = NULL; if (Comm::IsConnOpen(serverConn)) { flags.connected_okay = true; #if 0 if (!serverConn->getPeer()) serverConn->peerType = HIER_DIRECT; #endif ++n_tries; request->flags.pinned = 1; if (pinned_connection->pinnedAuth()) request->flags.auth = 1; comm_add_close_handler(serverConn->fd, fwdServerClosedWrapper, this); // the server may close the pinned connection before this request pconnRace = racePossible; dispatch(); return; } - /* Failure. Fall back on next path unless we can re-pin */ + // Pinned connection failure. debugs(17,2,HERE << "Pinned connection failed: " << pinned_connection); - if (pconnRace != raceHappened || !request->flags.canRePin) { - serverDestinations.shift(); - pconnRace = raceImpossible; - startConnectionOrFail(); - return; - } - debugs(17,3, HERE << "There was a pconn race. Will try to repin."); - // and fall through to regular handling + ErrorState *anErr = new ErrorState(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE, request); + fail(anErr); + self = NULL; // refcounted + return; } // Use pconn to avoid opening a new connection. const char *host; if (serverDestinations[0]->getPeer()) { host = serverDestinations[0]->getPeer()->host; } else { host = request->GetHost(); } Comm::ConnectionPointer temp; // Avoid pconns after races so that the same client does not suffer twice. // This does not increase the total number of connections because we just // closed the connection that failed the race. And re-pinning assumes this. if (pconnRace != raceHappened) temp = fwdPconnPool->pop(serverDestinations[0], host, checkRetriable()); const bool openedPconn = Comm::IsConnOpen(temp); pconnRace = openedPconn ? racePossible : raceImpossible; === modified file 'src/http.cc' --- src/http.cc 2012-12-27 17:58:29 +0000 +++ src/http.cc 2013-01-16 15:07:38 +0000 @@ -2119,40 +2119,42 @@ } if (_peer != NULL) { if (_peer->options.originserver) { flags.proxying = false; flags.originpeer = true; } else { flags.proxying = false; flags.originpeer = false; } } else { flags.proxying = false; flags.originpeer = false; } /* * Is keep-alive okay for all request methods? */ if (request->flags.mustKeepalive) flags.keepalive = true; + else if (request->flags.pinned) + flags.keepalive = request->persistent(); else if (!Config.onoff.server_pconns) flags.keepalive = false; else if (_peer == NULL) flags.keepalive = true; else if (_peer->stats.n_keepalives_sent < 10) flags.keepalive = true; else if ((double) _peer->stats.n_keepalives_recv / (double) _peer->stats.n_keepalives_sent > 0.50) flags.keepalive = true; if (_peer) { /*The old code here was if (neighborType(_peer, request) == PEER_SIBLING && ... which is equivalent to: if (neighborType(_peer, NULL) == PEER_SIBLING && ... or better: if (((_peer->type == PEER_MULTICAST && p->options.mcast_siblings) || _peer->type == PEER_SIBLINGS ) && _peer->options.allow_miss) flags.only_if_cached = 1;