=== modified file 'src/comm/Write.cc' --- src/comm/Write.cc 2014-02-21 10:46:19 +0000 +++ src/comm/Write.cc 2014-05-13 08:57:09 +0000 @@ -1,118 +1,145 @@ #include "squid.h" #include "comm/Connection.h" #include "comm/IoCallback.h" #include "comm/Write.h" #include "fd.h" #include "fde.h" #include "globals.h" #include "MemBuf.h" #include "profiler/Profiler.h" +#include "SBuf.h" #include "SquidTime.h" #include "StatCounters.h" #if USE_DELAY_POOLS #include "ClientInfo.h" #endif #include void +Comm::Write(const Comm::ConnectionPointer &conn, SBuf *sb, AsyncCall::Pointer &callback) +{ + debugs(5, 5, conn << ": sz " << sb->length() << ": asynCall " << callback); + + /* Make sure we are open, not closing, and not writing */ + assert(fd_table[conn->fd].flags.open); + assert(!fd_table[conn->fd].closing()); + Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); + assert(!ccb->active()); + + fd_table[conn->fd].writeStart = squid_curtime; + ccb->conn = conn; + /* Queue the write */ + ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, sb->length()); + ccb->buf2 = sb; + ccb->selectOrQueueWrite(); +} + +/// \deprecated use SBuf for I/O buffer instead +void Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback) { Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc()); } +/// \deprecated use SBuf for I/O buffer instead void Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) { debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback); /* Make sure we are open, not closing, and not writing */ assert(fd_table[conn->fd].flags.open); assert(!fd_table[conn->fd].closing()); Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); assert(!ccb->active()); fd_table[conn->fd].writeStart = squid_curtime; ccb->conn = conn; /* Queue the write */ ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size); ccb->selectOrQueueWrite(); } /** Write to FD. * This function is used by the lowest level of IO loop which only has access to FD numbers. * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections. * Once the write has been concluded we schedule the waiting call with success/fail results. */ void Comm::HandleWrite(int fd, void *data) { Comm::IoCallback *state = static_cast(data); int len = 0; int nleft; assert(state->conn != NULL && state->conn->fd == fd); PROF_start(commHandleWrite); - debugs(5, 5, HERE << state->conn << ": off " << - (long int) state->offset << ", sz " << (long int) state->size << "."); - nleft = state->size - state->offset; + debugs(5, 5, state->conn << ": off " << state->offset << ", sz " << state->size); + if (state->buf2) + nleft = state->buf2->length(); + else + nleft = state->size - state->offset; #if USE_DELAY_POOLS ClientInfo * clientInfo=fd_table[fd].clientInfo; if (clientInfo && !clientInfo->writeLimitingActive) clientInfo = NULL; // we only care about quota limits here if (clientInfo) { assert(clientInfo->selectWaiting); clientInfo->selectWaiting = false; assert(clientInfo->hasQueue()); assert(clientInfo->quotaPeekFd() == fd); clientInfo->quotaDequeue(); // we will write or requeue below if (nleft > 0) { const int quota = clientInfo->quotaForDequed(); if (!quota) { // if no write quota left, queue this fd state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); clientInfo->kickQuotaQueue(); PROF_stop(commHandleWrite); return; } const int nleft_corrected = min(nleft, quota); if (nleft != nleft_corrected) { debugs(5, 5, HERE << state->conn << " writes only " << nleft_corrected << " out of " << nleft); nleft = nleft_corrected; } } } #endif /* USE_DELAY_POOLS */ /* actually WRITE data */ - len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); - debugs(5, 5, HERE << "write() returns " << len); + if (state->buf2) + len = FD_WRITE_METHOD(fd, state->buf2->rawContent(), nleft); + else + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); + debugs(5, 5, "write() returns " << len); #if USE_DELAY_POOLS if (clientInfo) { if (len > 0) { /* we wrote data - drain them from bucket */ clientInfo->bucketSize -= len; if (clientInfo->bucketSize < 0.0) { debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen clientInfo->bucketSize = 0; } } // even if we wrote nothing, we were served; give others a chance clientInfo->kickQuotaQueue(); } #endif /* USE_DELAY_POOLS */ fd_bytes(fd, len, FD_WRITE); ++statCounter.syscalls.sock.writes; // After each successful partial write, @@ -125,30 +152,36 @@ if (nleft != 0) debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } else if (len < 0) { /* An error */ if (fd_table[fd].flags.socket_eof) { debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } else if (ignoreErrno(errno)) { debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->selectOrQueueWrite(); } else { debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } } else { /* A successful write, continue */ state->offset += len; + if (state->buf2) { + state->buf2->consume(len); + if (state->buf2->isEmpty()) + state->finish(nleft ? COMM_OK : COMM_ERROR, errno); + } + if (state->offset < state->size) { /* Not done, reinstall the write handler and write some more */ state->selectOrQueueWrite(); } else { state->finish(nleft ? COMM_OK : COMM_ERROR, errno); } } PROF_stop(commHandleWrite); } === modified file 'src/comm/Write.h' --- src/comm/Write.h 2012-08-14 11:53:07 +0000 +++ src/comm/Write.h 2014-05-13 08:56:43 +0000 @@ -1,34 +1,44 @@ #ifndef _SQUID_COMM_IOWRITE_H #define _SQUID_COMM_IOWRITE_H #include "base/AsyncCall.h" #include "comm/forward.h" #include "typedefs.h" class MemBuf; +class SBuf; + namespace Comm { /** * Queue a write. callback is scheduled when the write * completes, on error, or on file descriptor close. * * free_func is used to free the passed buffer when the write has completed. */ void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func); /** * Queue a write. callback is scheduled when the write * completes, on error, or on file descriptor close. */ void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback); +/** + * Queue a write for an SBuf contents. The SBuf content is consume()'d as it is written. + * callback is scheduled when the SBuf is emptied, on error, or on file descriptor close. + * If the SBuf is appended to while write(2) are ongoing some additional bytes may also be + * written before the callback is scheduled. + */ +void Write(const Comm::ConnectionPointer &conn, SBuf *sb, AsyncCall::Pointer &callback); + /// Cancel the write pending on FD. No action if none pending. void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason); // callback handler to process an FD which is available for writing. extern PF HandleWrite; } // namespace Comm #endif /* _SQUID_COMM_IOWRITE_H */ === modified file 'src/neighbors.cc' --- src/neighbors.cc 2014-04-30 10:50:09 +0000 +++ src/neighbors.cc 2014-05-12 12:55:36 +0000 @@ -151,50 +151,50 @@ return p->type; } /** * \return Whether it is appropriate to fetch REQUEST from PEER. */ bool peerAllowedToUse(const CachePeer * p, HttpRequest * request) { const CachePeerDomainList *d = NULL; assert(request != NULL); if (neighborType(p, request) == PEER_SIBLING) { #if PEER_MULTICAST_SIBLINGS if (p->type == PEER_MULTICAST && p->options.mcast_siblings && (request->flags.noCache || request->flags.refresh || request->flags.loopDetected || request->flags.needValidation)) debugs(15, 2, "peerAllowedToUse(" << p->name << ", " << request->GetHost() << ") : multicast-siblings optimization match"); #endif - if (request->flags.noCache) + if (request->flags.loopDetected) return false; - if (request->flags.refresh) + if (request->flags.noCache) return false; - if (request->flags.loopDetected) + if (request->flags.refresh && p->options.allow_miss) return false; - if (request->flags.needValidation) + if (request->flags.needValidation && p->options.allow_miss) return false; } // CONNECT requests are proxy requests. Not to be forwarded to origin servers. // Unless the destination port matches, in which case we MAY perform a 'DIRECT' to this CachePeer. if (p->options.originserver && request->method == Http::METHOD_CONNECT && request->port != p->in_addr.port()) return false; if (p->peer_domain == NULL && p->access == NULL) return true; bool do_ping = false; for (d = p->peer_domain; d; d = d->next) { if (0 == matchDomainName(request->GetHost(), d->domain)) { do_ping = d->do_ping; break; } do_ping = !d->do_ping; } === modified file 'src/tunnel.cc' --- src/tunnel.cc 2014-05-07 14:40:05 +0000 +++ src/tunnel.cc 2014-05-13 08:59:13 +0000 @@ -108,69 +108,57 @@ return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost()); }; /// Whether we are writing a CONNECT request to a peer. bool waitingForConnectRequest() const { return connectReqWriting; } /// Whether we are reading a CONNECT response from a peer. bool waitingForConnectResponse() const { return connectRespBuf; } /// Whether we are waiting for the CONNECT request/response exchange with the peer. bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); } /// Whether the client sent a CONNECT request to us. bool clientExpectsConnectResponse() const { return !(request != NULL && (request->flags.interceptTproxy || request->flags.intercepted)); } class Connection { public: - Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL) {} - - ~Connection(); + Connection() : size_ptr(NULL) {} int bytesWanted(int lower=0, int upper = INT_MAX) const; - void bytesIn(int const &); -#if USE_DELAY_POOLS - - void setDelayId(DelayId const &); -#endif - void error(int const xerrno); int debugLevelForError(int const xerrno) const; /// handles a non-I/O error associated with this Connection void logicError(const char *errMsg); void closeIfOpen(); void dataSent (size_t amount); - int len; - char *buf; + SBuf buf; int64_t *size_ptr; /* pointer to size in an ConnStateData for logging */ Comm::ConnectionPointer conn; ///< The currently connected connection. - private: #if USE_DELAY_POOLS - DelayId delayId; #endif - }; Connection client, server; int *status_ptr; /* pointer to status for logging */ MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer void copyRead(Connection &from, IOCB *completion); /// continue to set up connection to a peer, going async for SSL peers void connectToPeer(); private: #if USE_OPENSSL /// Gives PeerConnector access to Answer in the TunnelStateData callback dialer. class MyAnswerDialer: public CallDialer, public Ssl::PeerConnector::CbDialer { public: typedef void (TunnelStateData::*Method)(Ssl::PeerConnectorAnswer &); @@ -181,208 +169,196 @@ virtual bool canDial(AsyncCall &call) { return tunnel_.valid(); } void dial(AsyncCall &call) { ((&(*tunnel_))->*method_)(answer_); } virtual void print(std::ostream &os) const { os << '(' << tunnel_.get() << ", " << answer_ << ')'; } /* Ssl::PeerConnector::CbDialer API */ virtual Ssl::PeerConnectorAnswer &answer() { return answer_; } private: Method method_; CbcPointer tunnel_; Ssl::PeerConnectorAnswer answer_; }; void connectedToPeer(Ssl::PeerConnectorAnswer &answer); #endif CBDATA_CLASS2(TunnelStateData); bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to); - void copy(size_t len, Connection &from, Connection &to, IOCB *); + void copy(Connection &from, Connection &to, IOCB *); void handleConnectResponse(const size_t chunkSize); - void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno); - void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno); - void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno); - void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno); + void readServer(size_t len, comm_err_t errcode, int xerrno); + void readClient(size_t len, comm_err_t errcode, int xerrno); + void writeClientDone(size_t len, comm_err_t flag, int xerrno); + void writeServerDone(size_t len, comm_err_t flag, int xerrno); static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno); }; static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; static CNCB tunnelConnectDone; static ERCB tunnelErrorComplete; static CLCB tunnelServerClosed; static CLCB tunnelClientClosed; static CTCB tunnelTimeout; static PSC tunnelPeerSelectComplete; static void tunnelConnected(const Comm::ConnectionPointer &server, void *); static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *); static void tunnelServerClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->server.conn); tunnelState->server.conn = NULL; if (tunnelState->noConnections()) { delete tunnelState; return; } - if (!tunnelState->server.len) { + if (tunnelState->server.buf.isEmpty()) { tunnelState->client.conn->close(); return; } } static void tunnelClientClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->client.conn); tunnelState->client.conn = NULL; if (tunnelState->noConnections()) { delete tunnelState; return; } - if (!tunnelState->client.len) { + if (tunnelState->client.buf.isEmpty()) { tunnelState->server.conn->close(); return; } } TunnelStateData::TunnelStateData() : url(NULL), http(), request(NULL), status_ptr(NULL), connectRespBuf(NULL), connectReqWriting(false) { debugs(26, 3, "TunnelStateData constructed this=" << this); } TunnelStateData::~TunnelStateData() { debugs(26, 3, "TunnelStateData destructed this=" << this); assert(noConnections()); xfree(url); serverDestinations.clear(); delete connectRespBuf; } -TunnelStateData::Connection::~Connection() -{ - safe_free(buf); -} - int TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const { #if USE_DELAY_POOLS return delayId.bytesWanted(lowerbound, upperbound); #else return upperbound; #endif } -void -TunnelStateData::Connection::bytesIn(int const &count) -{ - debugs(26, 3, HERE << "len=" << len << " + count=" << count); -#if USE_DELAY_POOLS - delayId.bytesIn(count); -#endif - - len += count; -} - int TunnelStateData::Connection::debugLevelForError(int const xerrno) const { #ifdef ECONNRESET if (xerrno == ECONNRESET) return 2; #endif if (ignoreErrno(xerrno)) return 3; return 1; } /* Read from server side and queue it for writing to the client */ void TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert(cbdataReferenceValid(tunnelState)); debugs(26, 3, HERE << c); - tunnelState->readServer(buf, len, errcode, xerrno); + tunnelState->readServer(len, errcode, xerrno); } void -TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno) +TunnelStateData::readServer(size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { - server.bytesIn(len); +#if USE_DELAY_POOLS + server.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.other.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, server, client)) - copy(len, server, client, WriteClientDone); + copy(server, client, WriteClientDone); } /// Called when we read [a part of] CONNECT response from the peer void TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode); assert(waitingForConnectResponse()); if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { connectRespBuf->appended(len); - server.bytesIn(len); +#if USE_DELAY_POOLS + server.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.other.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, server, client)) handleConnectResponse(len); } /* Read from client side and queue it for writing to the server */ void TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); tunnelState->readConnectResponseDone(buf, len, errcode, xerrno); } /// Parses [possibly incomplete] CONNECT response and reacts to it. /// If the tunnel is being closed or more response data is needed, returns false. @@ -417,169 +393,167 @@ server.logicError("huge CONNECT response from peer"); return; } // keep reading readConnectResponse(); return; } // CONNECT response was successfully parsed *status_ptr = rep.sline.status(); // bail if we did not get an HTTP 200 (Connection Established) response if (rep.sline.status() != Http::scOkay) { server.logicError("unsupported CONNECT response status code"); return; } if (rep.hdr_sz < connectRespBuf->contentSize()) { // preserve bytes that the server already sent after the CONNECT response - server.len = connectRespBuf->contentSize() - rep.hdr_sz; - memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len); - } else { - // reset; delay pools were using this field to throttle CONNECT response - server.len = 0; + server.buf.append(connectRespBuf->content()+rep.hdr_sz, connectRespBuf->contentSize() - rep.hdr_sz); } delete connectRespBuf; connectRespBuf = NULL; connectExchangeCheckpoint(); } void TunnelStateData::Connection::logicError(const char *errMsg) { debugs(50, 3, conn << " closing on error: " << errMsg); conn->close(); } void TunnelStateData::Connection::error(int const xerrno) { /* XXX fixme xstrerror and xerrno... */ errno = xerrno; debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror()); if (!ignoreErrno(xerrno)) conn->close(); } /* Read from client side and queue it for writing to the server */ void TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->readClient(buf, len, errcode, xerrno); + tunnelState->readClient(len, errcode, xerrno); } void -TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrno) +TunnelStateData::readClient(size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { - client.bytesIn(len); +#if USE_DELAY_POOLS + client.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.client_http.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, client, server)) - copy(len, client, server, WriteServerDone); + copy(client, server, WriteServerDone); } /// Updates state after reading from client or server. /// Returns whether the caller should use the data just read. bool TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to) { debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}"); /* I think this is to prevent free-while-in-a-callback behaviour * - RBC 20030229 * from.conn->close() / to.conn->close() done here trigger close callbacks which may free TunnelStateData */ const CbcPointer safetyLock(this); /* Bump the source connection read timeout on any activity */ if (Comm::IsConnOpen(from.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall); } /* Bump the dest connection read timeout on any activity */ /* see Bug 3659: tunnels can be weird, with very long one-way transfers */ if (Comm::IsConnOpen(to.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall); } if (errcode) from.error (xerrno); else if (len == 0 || !Comm::IsConnOpen(to.conn)) { debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the tunnel."); from.conn->close(); /* Only close the remote end if we've finished queueing data to it */ - if (from.len == 0 && Comm::IsConnOpen(to.conn) ) { + if (from.buf.isEmpty() && Comm::IsConnOpen(to.conn)) { to.conn->close(); } } else if (cbdataReferenceValid(this)) { return true; } return false; } void -TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion) +TunnelStateData::copy(Connection &from, Connection &to, IOCB *completion) { debugs(26, 3, HERE << "Schedule Write"); AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler", CommIoCbPtrFun(completion, this)); - Comm::Write(to.conn, from.buf, len, call, NULL); + Comm::Write(to.conn, &from.buf, call); } /* Writes data from the client buffer to the server side */ void TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->writeServerDone(buf, len, flag, xerrno); + tunnelState->writeServerDone(len, flag, xerrno); } void -TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno) +TunnelStateData::writeServerDone(size_t len, comm_err_t flag, int xerrno) { debugs(26, 3, HERE << server.conn << ", " << len << " bytes written, flag=" << flag); /* Error? */ if (flag != COMM_OK) { if (flag != COMM_ERR_CLOSING) { debugs(26, 4, HERE << "calling TunnelStateData::server.error(" << xerrno <<")"); server.error(xerrno); // may call comm_close } return; } /* EOF? */ if (len == 0) { debugs(26, 4, HERE << "No read input. Closing server connection."); server.conn->close(); return; } /* Valid data */ @@ -590,57 +564,55 @@ /* If the other end has closed, so should we */ if (!Comm::IsConnOpen(client.conn)) { debugs(26, 4, HERE << "Client gone away. Shutting down server connection."); server.conn->close(); return; } const CbcPointer safetyLock(this); /* ??? should be locked by the caller... */ if (cbdataReferenceValid(this)) copyRead(client, ReadClient); } /* Writes data from the server buffer to the client side */ void TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->writeClientDone(buf, len, flag, xerrno); + tunnelState->writeClientDone(len, flag, xerrno); } void TunnelStateData::Connection::dataSent(size_t amount) { - debugs(26, 3, HERE << "len=" << len << " - amount=" << amount); - assert(amount == (size_t)len); - len =0; - /* increment total object size */ + debugs(26, 3, amount << " bytes written"); + /* increment total object size */ if (size_ptr) *size_ptr += amount; } void -TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno) +TunnelStateData::writeClientDone(size_t len, comm_err_t flag, int xerrno) { debugs(26, 3, HERE << client.conn << ", " << len << " bytes written, flag=" << flag); /* Error? */ if (flag != COMM_OK) { if (flag != COMM_ERR_CLOSING) { debugs(26, 4, HERE << "Closing client connection due to comm flags."); client.error(xerrno); // may call comm_close } return; } /* EOF? */ if (len == 0) { debugs(26, 4, HERE << "Closing client connection due to 0 byte read."); client.conn->close(); return; } /* Valid data */ @@ -665,86 +637,86 @@ { TunnelStateData *tunnelState = static_cast(io.data); debugs(26, 3, HERE << io.conn); /* Temporary lock to protect our own feets (comm_close -> tunnelClientClosed -> Free) */ CbcPointer safetyLock(tunnelState); tunnelState->client.closeIfOpen(); tunnelState->server.closeIfOpen(); } void TunnelStateData::Connection::closeIfOpen() { if (Comm::IsConnOpen(conn)) conn->close(); } void TunnelStateData::copyRead(Connection &from, IOCB *completion) { - assert(from.len == 0); AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler", CommIoCbPtrFun(completion, this)); - comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call); + from.buf.reserveSpace(from.bytesWanted(1, SQUID_TCP_SO_RCVBUF)); + comm_read(from.conn, from.buf, call); } void TunnelStateData::readConnectResponse() { assert(waitingForConnectResponse()); AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone", CommIoCbPtrFun(ReadConnectResponseDone, this)); comm_read(server.conn, connectRespBuf->space(), server.bytesWanted(1, connectRespBuf->spaceSize()), call); } /** * Set the HTTP status for this request and sets the read handlers for client * and server side connections. */ static void tunnelStartShoveling(TunnelStateData *tunnelState) { assert(!tunnelState->waitingForConnectExchange()); *tunnelState->status_ptr = Http::scOkay; if (cbdataReferenceValid(tunnelState)) { // Shovel any payload already pushed into reply buffer by the server response - if (!tunnelState->server.len) + if (tunnelState->server.buf.isEmpty()) tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer); else { - debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << Raw("", tunnelState->server.buf, tunnelState->server.len) << "\n----------"); - tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); + debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << tunnelState->server.buf << "\n----------"); + tunnelState->copy(tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); } // Bug 3371: shovel any payload already pushed into ConnStateData by the client request if (tunnelState->http.valid() && tunnelState->http->getConn() && !tunnelState->http->getConn()->in.buf.isEmpty()) { struct ConnStateData::In *in = &tunnelState->http->getConn()->in; debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf << "\n----------"); // We just need to ensure the bytes from ConnStateData are in client.buf already to deliver - memcpy(tunnelState->client.buf, in->buf.rawContent(), in->buf.length()); - // NP: readClient() takes care of buffer length accounting. - tunnelState->readClient(tunnelState->client.buf, in->buf.length(), COMM_OK, 0); + tunnelState->client.buf = in->buf; in->buf.consume(); // ConnStateData buffer accounting after the shuffle. + // NP: readClient() takes care of buffer length accounting. + tunnelState->readClient(tunnelState->client.buf.length(), COMM_OK, 0); } else tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient); } } /** * All the pieces we need to write to client and/or server connection * have been written. * Call the tunnelStartShoveling to start the blind pump. */ static void tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, HERE << conn << ", flag=" << flag); if (flag != COMM_OK) { *tunnelState->status_ptr = Http::scInternalServerError; tunnelErrorComplete(conn->fd, data, 0); return; @@ -838,41 +810,41 @@ Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } else { debugs(26, 4, HERE << "terminate with error."); ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw()); *tunnelState->status_ptr = Http::scServiceUnavailable; err->xerrno = xerrno; // on timeout is this still: err->xerrno = ETIMEDOUT; err->port = conn->remote.port(); err->callback = tunnelErrorComplete; err->callback_data = tunnelState; errorSend(tunnelState->client.conn, err); } return; } #if USE_DELAY_POOLS /* no point using the delayIsNoDelay stuff since tunnel is nice and simple */ if (conn->getPeer() && conn->getPeer()->options.no_delay) - tunnelState->server.setDelayId(DelayId()); + tunnelState->server.delayId = DelayId(); #endif tunnelState->request->hier.note(conn, tunnelState->getHost()); tunnelState->server.conn = conn; tunnelState->request->peer_host = conn->getPeer() ? conn->getPeer()->host : NULL; comm_add_close_handler(conn->fd, tunnelServerClosed, tunnelState); debugs(26, 4, HERE << "determine post-connect handling pathway."); if (conn->getPeer()) { tunnelState->request->peer_login = conn->getPeer()->login; tunnelState->request->flags.proxying = !(conn->getPeer()->options.originserver); } else { tunnelState->request->peer_login = NULL; tunnelState->request->flags.proxying = false; } if (tunnelState->request->flags.proxying) tunnelState->connectToPeer(); else { @@ -906,41 +878,41 @@ * default is to allow. */ ACLFilledChecklist ch(Config.accessList.miss, request, NULL); ch.src_addr = request->client_addr; ch.my_addr = request->my_addr; if (ch.fastCheck() == ACCESS_DENIED) { debugs(26, 4, HERE << "MISS access forbidden."); err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden, request); *status_ptr = Http::scForbidden; errorSend(http->getConn()->clientConnection, err); return; } } debugs(26, 3, request->method << ' ' << url << ' ' << request->http_ver); ++statCounter.server.all.requests; ++statCounter.server.other.requests; tunnelState = new TunnelStateData; #if USE_DELAY_POOLS - tunnelState->server.setDelayId(DelayId::DelayClient(http)); + tunnelState->server.delayId = DelayId::DelayClient(http); #endif tunnelState->url = xstrdup(url); tunnelState->request = request; tunnelState->server.size_ptr = size_ptr; tunnelState->status_ptr = status_ptr; tunnelState->client.conn = http->getConn()->clientConnection; tunnelState->http = http; tunnelState->al = al; comm_add_close_handler(tunnelState->client.conn->fd, tunnelClientClosed, tunnelState); AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, timeoutCall); peerSelect(&(tunnelState->serverDestinations), request, al, NULL, tunnelPeerSelectComplete, @@ -1063,29 +1035,20 @@ delete err; GetMarkingsToServer(tunnelState->request.getRaw(), *tunnelState->serverDestinations[0]); debugs(26, 3, HERE << "paths=" << peer_paths->size() << ", p[0]={" << (*peer_paths)[0] << "}, serverDest[0]={" << tunnelState->serverDestinations[0] << "}"); AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", CommConnectCbPtrFun(tunnelConnectDone, tunnelState)); Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } CBDATA_CLASS_INIT(TunnelStateData); bool TunnelStateData::noConnections() const { return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn); } - -#if USE_DELAY_POOLS -void -TunnelStateData::Connection::setDelayId(DelayId const &newDelay) -{ - delayId = newDelay; -} - -#endif