Support forwarding intercepted but not bumped connections to cache_peers. When talking to a cache_peer (i.e., sending a CONNECT request before tunneling the transaction), tunnel code is using a clever hack: Squid does not parse the CONNECT response from peer but blindly forwards it to the client. This works great and simplifies code a lot, except when the client connection was intercepted and, hence, the client did not send a CONNECT request and is not expecting a CONNECT response. In those situations, we now accumulate, parse, and strip the peer CONNECT response (or close connection on errors). The existing tunnel I/O code is too simple to accommodate that task -- it cannot accumulate read data (its I/O buffers work in lockstep fashion, writing everything it reads before reading again). Instead of rewriting the entire tunnel code to use more complex buffers, I added a temporary accumulation buffer for the CONNECT response. That buffer is not allocated unless it is needed and does not grow beyond SQUID_TCP_SO_RCVBUF size, just like the simple buffers. === modified file 'src/tunnel.cc' --- src/tunnel.cc 2013-05-13 03:57:03 +0000 +++ src/tunnel.cc 2013-05-24 22:58:59 +0000 @@ -74,93 +74,120 @@ * instead of re-implementing it here and occasionally getting the ConnStateData * read/write state wrong. * * TODO 2: then convert this into a AsyncJob, possibly a child of 'Server' */ class TunnelStateData { public: TunnelStateData(); ~TunnelStateData(); TunnelStateData(const TunnelStateData &); // do not implement TunnelStateData &operator =(const TunnelStateData &); // do not implement class Connection; static void ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); static void ReadServer(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); + /// Starts reading peer response to our CONNECT request. + void readConnectResponse(); + + /// Called when we may be done handling a CONNECT exchange with the peer. + void connectExchangeCheckpoint(); + bool noConnections() const; char *url; HttpRequest *request; Comm::ConnectionList serverDestinations; const char * getHost() const { return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost()); }; + /// Whether we are writing a CONNECT request to a peer. + bool waitingForConnectRequest() const { return connectReqWriting; } + /// Whether we are reading a CONNECT response from a peer. + bool waitingForConnectResponse() const { return connectRespBuf; } + /// Whether we are waiting for the CONNECT request/response exchange with the peer. + bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); } + + /// Whether the client sent a CONNECT request to us. + bool clientExpectsConnectResponse() const { return !(request && + (request->flags.interceptTproxy || request->flags.intercepted)); } + class Connection { public: Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL) {} ~Connection(); int bytesWanted(int lower=0, int upper = INT_MAX) const; void bytesIn(int const &); #if USE_DELAY_POOLS void setDelayId(DelayId const &); #endif void error(int const xerrno); int debugLevelForError(int const xerrno) const; + /// handles a non-I/O error associated with this Connection + void logicError(const char *errMsg); void closeIfOpen(); void dataSent (size_t amount); int len; char *buf; int64_t *size_ptr; /* pointer to size in an ConnStateData for logging */ Comm::ConnectionPointer conn; ///< The currently connected connection. private: #if USE_DELAY_POOLS DelayId delayId; #endif }; Connection client, server; int *status_ptr; /* pointer to status for logging */ + MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it + bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer + void copyRead(Connection &from, IOCB *completion); private: CBDATA_CLASS2(TunnelStateData); - void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *); + bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to); + void copy(size_t len, Connection &from, Connection &to, IOCB *); + void handleConnectResponse(const size_t chunkSize); void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno); void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno); void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno); void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno); + + static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); + void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno); }; static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; static CNCB tunnelConnectDone; static ERCB tunnelErrorComplete; static CLCB tunnelServerClosed; static CLCB tunnelClientClosed; static CTCB tunnelTimeout; static PSC tunnelPeerSelectComplete; static void tunnelConnected(const Comm::ConnectionPointer &server, void *); static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *); static void tunnelServerClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->server.conn); tunnelState->server.conn = NULL; @@ -179,52 +206,55 @@ tunnelClientClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->client.conn); tunnelState->client.conn = NULL; if (tunnelState->noConnections()) { delete tunnelState; return; } if (!tunnelState->client.len) { tunnelState->server.conn->close(); return; } } TunnelStateData::TunnelStateData() : url(NULL), request(NULL), - status_ptr(NULL) + status_ptr(NULL), + connectRespBuf(NULL), + connectReqWriting(false) { debugs(26, 3, "TunnelStateData constructed this=" << this); } TunnelStateData::~TunnelStateData() { debugs(26, 3, "TunnelStateData destructed this=" << this); assert(noConnections()); xfree(url); serverDestinations.clean(); HTTPMSGUNLOCK(request); + delete connectRespBuf; } TunnelStateData::Connection::~Connection() { safe_free(buf); } int TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const { #if USE_DELAY_POOLS return delayId.bytesWanted(lowerbound, upperbound); #else return upperbound; #endif } void TunnelStateData::Connection::bytesIn(int const &count) @@ -266,41 +296,142 @@ void TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { server.bytesIn(len); kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.other.kbytes_in), len); } - copy (len, errcode, xerrno, server, client, WriteClientDone); + if (keepGoingAfterRead(len, errcode, xerrno, server, client)) + copy(len, server, client, WriteClientDone); +} + +/// Called when we read [a part of] CONNECT response from the peer +void +TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno) +{ + debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode); + assert(waitingForConnectResponse()); + + if (errcode == COMM_ERR_CLOSING) + return; + + if (len > 0) { + connectRespBuf->appended(len); + server.bytesIn(len); + kb_incr(&(statCounter.server.all.kbytes_in), len); + kb_incr(&(statCounter.server.other.kbytes_in), len); + } + + if (keepGoingAfterRead(len, errcode, xerrno, server, client)) + handleConnectResponse(len); +} + +/* Read from client side and queue it for writing to the server */ +void +TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) +{ + TunnelStateData *tunnelState = (TunnelStateData *)data; + assert (cbdataReferenceValid (tunnelState)); + + tunnelState->readConnectResponseDone(buf, len, errcode, xerrno); +} + +/// Parses [possibly incomplete] CONNECT response and reacts to it. +/// If the tunnel is being closed or more response data is needed, returns false. +/// Otherwise, the caller should handle the remaining read data, if any. +void +TunnelStateData::handleConnectResponse(const size_t chunkSize) +{ + assert(waitingForConnectResponse()); + + // Ideally, client and server should use MemBuf or better, but current code + // never accumulates more than one read when shoveling data (XXX) so it does + // not need to deal with MemBuf complexity. To keep it simple, we use a + // dedicated MemBuf for accumulating CONNECT responses. TODO: When shoveling + // is optimized, reuse server.buf for CONNEC response accumulation instead. + + /* mimic the basic parts of HttpStateData::processReplyHeader() */ + HttpReply rep; + Http::StatusCode parseErr = Http::scNone; + const bool eof = !chunkSize; + const bool parsed = rep.parse(connectRespBuf, eof, &parseErr); + if (!parsed) { + if (parseErr > 0) { // unrecoverable parsing error + server.logicError("malformed CONNECT response from peer"); + return; + } + + // need more data + assert(!eof); + assert(!parseErr); + + if (!connectRespBuf->hasSpace()) { + server.logicError("huge CONNECT response from peer"); + return; + } + + // keep reading + readConnectResponse(); + return; + } + + // CONNECT response was successfully parsed + *status_ptr = rep.sline.status(); + + // bail if we did not get an HTTP 200 (Connection Established) response + if (rep.sline.status() != Http::scOkay) { + server.logicError("unsupported CONNECT response status code"); + return; + } + + if (rep.hdr_sz < connectRespBuf->contentSize()) { + // preserve bytes that the server already sent after the CONNECT response + server.len = connectRespBuf->contentSize() - rep.hdr_sz; + memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len); + } else { + // reset; delay pools were using this field to throttle CONNECT response + server.len = 0; + } + + delete connectRespBuf; + connectRespBuf = NULL; + connectExchangeCheckpoint(); +} + +void +TunnelStateData::Connection::logicError(const char *errMsg) +{ + debugs(50, 3, conn << " closing on error: " << errMsg); + conn->close(); } void TunnelStateData::Connection::error(int const xerrno) { /* XXX fixme xstrerror and xerrno... */ errno = xerrno; debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror()); if (!ignoreErrno(xerrno)) conn->close(); } /* Read from client side and queue it for writing to the server */ void TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); @@ -309,85 +440,96 @@ } void TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { client.bytesIn(len); kb_incr(&(statCounter.client_http.kbytes_in), len); } - copy (len, errcode, xerrno, client, server, WriteServerDone); + if (keepGoingAfterRead(len, errcode, xerrno, client, server)) + copy(len, client, server, WriteServerDone); } -void -TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *completion) +/// Updates state after reading from client or server. +/// Returns whether the caller should use the data just read. +bool +TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to) { debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}"); /* I think this is to prevent free-while-in-a-callback behaviour * - RBC 20030229 * from.conn->close() / to.conn->close() done here trigger close callbacks which may free TunnelStateData */ const CbcPointer safetyLock(this); /* Bump the source connection read timeout on any activity */ if (Comm::IsConnOpen(from.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall); } /* Bump the dest connection read timeout on any activity */ /* see Bug 3659: tunnels can be weird, with very long one-way transfers */ if (Comm::IsConnOpen(to.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall); } if (errcode) from.error (xerrno); else if (len == 0 || !Comm::IsConnOpen(to.conn)) { debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the tunnel."); from.conn->close(); /* Only close the remote end if we've finished queueing data to it */ if (from.len == 0 && Comm::IsConnOpen(to.conn) ) { to.conn->close(); } } else if (cbdataReferenceValid(this)) { + return true; + } + + return false; +} + +void +TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion) +{ debugs(26, 3, HERE << "Schedule Write"); AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler", CommIoCbPtrFun(completion, this)); Comm::Write(to.conn, from.buf, len, call, NULL); - } } /* Writes data from the client buffer to the server side */ void TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); tunnelState->writeServerDone(buf, len, flag, xerrno); } void TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno) { debugs(26, 3, HERE << server.conn << ", " << len << " bytes written, flag=" << flag); /* Error? */ if (flag != COMM_OK) { if (flag != COMM_ERR_CLOSING) { @@ -493,84 +635,131 @@ tunnelState->client.closeIfOpen(); tunnelState->server.closeIfOpen(); } void TunnelStateData::Connection::closeIfOpen() { if (Comm::IsConnOpen(conn)) conn->close(); } void TunnelStateData::copyRead(Connection &from, IOCB *completion) { assert(from.len == 0); AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler", CommIoCbPtrFun(completion, this)); comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call); } +void +TunnelStateData::readConnectResponse() +{ + assert(waitingForConnectResponse()); + + AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone", + CommIoCbPtrFun(ReadConnectResponseDone, this)); + comm_read(server.conn, connectRespBuf->space(), + server.bytesWanted(1, connectRespBuf->spaceSize()), call); +} + /** * Set the HTTP status for this request and sets the read handlers for client * and server side connections. */ static void tunnelStartShoveling(TunnelStateData *tunnelState) { + assert(!tunnelState->waitingForConnectExchange()); *tunnelState->status_ptr = Http::scOkay; if (cbdataReferenceValid(tunnelState)) { + if (!tunnelState->server.len) tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer); + else + tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient); } } /** * All the pieces we need to write to client and/or server connection * have been written. * Call the tunnelStartShoveling to start the blind pump. */ static void tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, HERE << conn << ", flag=" << flag); if (flag != COMM_OK) { *tunnelState->status_ptr = Http::scInternalServerError; tunnelErrorComplete(conn->fd, data, 0); return; } tunnelStartShoveling(tunnelState); } +/// Called when we are done writing CONNECT request to a peer. +static void +tunnelConnectReqWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) +{ + TunnelStateData *tunnelState = (TunnelStateData *)data; + debugs(26, 3, conn << ", flag=" << flag); + assert(tunnelState->waitingForConnectRequest()); + + if (flag != COMM_OK) { + *tunnelState->status_ptr = Http::scInternalServerError; + tunnelErrorComplete(conn->fd, data, 0); + return; + } + + tunnelState->connectReqWriting = false; + tunnelState->connectExchangeCheckpoint(); +} + +void +TunnelStateData::connectExchangeCheckpoint() +{ + if (waitingForConnectResponse()) { + debugs(26, 5, "still reading CONNECT response on " << server.conn); + } else if (waitingForConnectRequest()) { + debugs(26, 5, "still writing CONNECT request on " << server.conn); + } else { + assert(!waitingForConnectExchange()); + debugs(26, 3, "done with CONNECT exchange on " << server.conn); + tunnelConnected(server.conn, this); + } +} + /* * handle the write completion from a proxy request to an upstream origin */ static void tunnelConnected(const Comm::ConnectionPointer &server, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState); - if (tunnelState->request && (tunnelState->request->flags.interceptTproxy || tunnelState->request->flags.intercepted)) + if (!tunnelState->clientExpectsConnectResponse()) tunnelStartShoveling(tunnelState); // ssl-bumped connection, be quiet else { AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone", CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState)); Comm::Write(tunnelState->client.conn, conn_established, strlen(conn_established), call, NULL); } } static void tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_t) { TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, HERE << "FD " << fd); assert(tunnelState != NULL); /* temporary lock to save our own feets (comm_close -> tunnelClientClosed -> Free) */ CbcPointer safetyLock(tunnelState); if (Comm::IsConnOpen(tunnelState->client.conn)) tunnelState->client.conn->close(); @@ -703,63 +892,84 @@ tunnelState->client.conn = http->getConn()->clientConnection; comm_add_close_handler(tunnelState->client.conn->fd, tunnelClientClosed, tunnelState); AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, timeoutCall); peerSelect(&(tunnelState->serverDestinations), request, NULL, tunnelPeerSelectComplete, tunnelState); } static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; + assert(!tunnelState->waitingForConnectExchange()); HttpHeader hdr_out(hoRequest); Packer p; HttpStateFlags flags; debugs(26, 3, HERE << srv << ", tunnelState=" << tunnelState); memset(&flags, '\0', sizeof(flags)); flags.proxying = tunnelState->request->flags.proxying; MemBuf mb; mb.init(); mb.Printf("CONNECT %s HTTP/1.1\r\n", tunnelState->url); HttpStateData::httpBuildRequestHeader(tunnelState->request, NULL, /* StoreEntry */ NULL, /* AccessLogEntry */ &hdr_out, flags); /* flags */ packerToMemInit(&p, &mb); hdr_out.packInto(&p); hdr_out.clean(); packerClean(&p); mb.append("\r\n", 2); + if (tunnelState->clientExpectsConnectResponse()) { + // hack: blindly tunnel peer response (to our CONNECT request) to the client as ours. AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectedWriteDone", CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState)); Comm::Write(srv, &mb, writeCall); + } else { + // we have to eat the connect response from the peer (so that the client + // does not see it) and only then start shoveling data to the client + AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectReqWriteDone", + CommIoCbPtrFun(tunnelConnectReqWriteDone, + tunnelState)); + Comm::Write(srv, &mb, writeCall); + tunnelState->connectReqWriting = true; + + tunnelState->connectRespBuf = new MemBuf; + // SQUID_TCP_SO_RCVBUF: we should not accumulate more than regular I/O buffer + // can hold since any CONNECT response leftovers have to fit into server.buf. + // 2*SQUID_TCP_SO_RCVBUF: HttpMsg::parse() zero-terminates, which uses space. + tunnelState->connectRespBuf->init(SQUID_TCP_SO_RCVBUF, 2*SQUID_TCP_SO_RCVBUF); + tunnelState->readConnectResponse(); + + assert(tunnelState->waitingForConnectExchange()); + } AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); commSetConnTimeout(srv, Config.Timeout.read, timeoutCall); } static void tunnelPeerSelectComplete(Comm::ConnectionList *peer_paths, ErrorState *err, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; if (peer_paths == NULL || peer_paths->size() < 1) { debugs(26, 3, HERE << "No paths found. Aborting CONNECT"); if (!err) { err = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, tunnelState->request); } *tunnelState->status_ptr = err->httpStatus; err->callback = tunnelErrorComplete; err->callback_data = tunnelState; errorSend(tunnelState->client.conn, err);