=== modified file 'src/MemBuf.cc' --- src/MemBuf.cc 2012-11-28 01:13:21 +0000 +++ src/MemBuf.cc 2014-02-08 04:48:33 +0000 @@ -259,7 +259,10 @@ grow(size + sz + 1); assert(size + sz <= capacity); /* paranoid */ - memcpy(space(), newContent, sz); + // memmove() allows memory blocks to overlap, + // but glibc does not check for the dst == src case. + if (space() != newContent) + memmove(space(), newContent, sz); appended(sz); } PROF_stop(MemBuf_append); === modified file 'src/acl/DestinationIp.cc' --- src/acl/DestinationIp.cc 2013-05-13 23:32:23 +0000 +++ src/acl/DestinationIp.cc 2013-10-13 08:28:30 +0000 @@ -57,8 +57,8 @@ // To resolve this we will force DIRECT and only to the original client destination. // In which case, we also need this ACL to accurately match the destination if (Config.onoff.client_dst_passthru && (checklist->request->flags.intercepted || checklist->request->flags.interceptTproxy)) { - assert(checklist->conn() && checklist->conn()->clientConnection != NULL); - return ACLIP::match(checklist->conn()->clientConnection->local); + assert(checklist->conn() && checklist->conn()->tcp != NULL); + return ACLIP::match(checklist->conn()->tcp->local); } if (flags.isSet(ACL_F_NO_LOOKUP)) { === modified file 'src/acl/FilledChecklist.cc' --- src/acl/FilledChecklist.cc 2013-10-25 00:13:46 +0000 +++ src/acl/FilledChecklist.cc 2013-10-29 02:33:43 +0000 @@ -79,13 +79,13 @@ int ACLFilledChecklist::fd() const { - return (conn_ != NULL && conn_->clientConnection != NULL) ? conn_->clientConnection->fd : fd_; + return (conn_ != NULL && conn_->tcp != NULL) ? conn_->tcp->fd : fd_; } void ACLFilledChecklist::fd(int aDescriptor) { - assert(!conn() || conn()->clientConnection == NULL || conn()->clientConnection->fd == aDescriptor); + assert(!conn() || conn()->tcp == NULL || conn()->tcp->fd == aDescriptor); fd_ = aDescriptor; } === added file 'src/anyp/Agent.cc' --- src/anyp/Agent.cc 1970-01-01 00:00:00 +0000 +++ src/anyp/Agent.cc 2014-02-28 21:07:47 +0000 @@ -0,0 +1,284 @@ +/* + * DEBUG: section 05 Transfer Protocol I/O Agents + * + * - level 2 minor socket errors + * - level 3 duplicate reasons for halting I/O (bugs? only need to halt once) + * - level 4 reasons for errors and halting I/O + * - level 5 common I/O and buffer activity + */ +#include "squid.h" +#include "anyp/Agent.h" +#include "comm.h" +#include "CommCalls.h" +#include "comm/Write.h" +#include "Debug.h" +#include "fd.h" +#include "fde.h" +#include "StatCounters.h" +#include "tools.h" + +AnyP::Agent::Agent() : + AsyncJob("AnyP::Agent"), + tcp(), + inBuf(), + stoppedReceiving_(NULL), + stoppedSending_(NULL), + closed_(), + reader_() +{} + +void +AnyP::Agent::connectionInit(const Comm::ConnectionPointer &c) +{ + tcp = c; + + typedef CommCbMemFunT Dialer; + closed_ = JobCallback(33, 5, Dialer, this, AnyP::Agent::handleConnectionClosed); + comm_add_close_handler(tcp->fd, closed_); +} + +bool +AnyP::Agent::doneAll() const +{ + return stoppedSending() && stoppedReceiving() && AsyncJob::doneAll(); +} + +void +AnyP::Agent::swanSong() +{ + if (closed_ != NULL) + closed_->cancel("AnyP::Agent::swanSong"); + + if (Comm::IsConnOpen(tcp)) + tcp->close(); +} + +void +AnyP::Agent::releaseConnection(const char *reason) +{ + // Used by kids to release the connection before + // storing it in a Pconn pool for reuse. + comm_remove_close_handler(tcp->fd, closed_); + closed_->cancel(reason); + if (reading()) { + comm_read_cancel(tcp->fd, reader_); + reader_ = NULL; + } + // XXX: remove half-closed handler ?? +} + +void +AnyP::Agent::stopReadingXXX() +{ + if (reading()) { + comm_read_cancel(tcp->fd, reader_); + reader_ = NULL; + } +} + +void +AnyP::Agent::stopReceiving(const char *error) +{ + debugs(5, 4, "receiving error (" << tcp << "): " << error << + "; old sending error: " << (stoppedSending() ? stoppedSending_ : "none")); + + if (const char *oldError = stoppedReceiving()) { + debugs(5, 3, "already stopped receiving: " << oldError); + return; // nothing has changed as far as this connection is concerned + } + + stoppedReceiving_ = error; + + if (const char *sendError = stoppedSending()) { + debugs(5, 3, "closing because also stopped sending: " << sendError); + closed_->cancel("graceful close"); + tcp->close(); + } +} + +void +AnyP::Agent::stopSending(const char *error) +{ + debugs(5, 4, "sending error (" << tcp << "): " << error << + "; old receiving error: " << + (stoppedReceiving() ? stoppedReceiving_ : "none")); + + if (const char *oldError = stoppedSending()) { + debugs(5, 3, "already stopped sending: " << oldError); + return; // nothing has changed as far as this connection is concerned + } + stoppedSending_ = error; + + if (!stoppedReceiving()) { + if (const int64_t expecting = mayNeedToReadMore()) { + debugs(5, 5, "must still read " << expecting << + " bytes with " << inBuf.contentSize() << " unused"); + return; // wait for the receiver to finish reading + } + } + closed_->cancel("AnyP::Agent::stopSending"); + tcp->close(); +} + +bool +AnyP::Agent::maybeMakeSpaceAvailable() +{ + /* Grow the bufer whenever there is <2 bytes of space available. + * + * why <2? Because delayAwareRead() won't actually read if + * you ask it to read 1 byte. The delayed read(2) request + * just gets re-queued until the client side drains, then + * the I/O thread hangs. + * Better to return false and cause the caller not to register + * any read handler now until we get a notification from someone + * that its okay to read again if the buffer cannot grow. + */ + + if (inBuf.spaceSize() < 2) { + if (!inBuf.hasPotentialSpace()) { + debugs(5, 5, "buffer full: " << inBuf.contentSize() << " of " << (inBuf.max_capacity-1) << " bytes"); + return false; + } + (void)inBuf.space(inBuf.contentSize()*2); + debugs(5, 5, "growing buffer: content-size=" << inBuf.contentSize() << " capacity=" << inBuf.capacity); + } + + // in case the grow operation above failed for any reason. + return (inBuf.spaceSize() > 1); +} + +void +AnyP::Agent::readSomeData() +{ + // one read(2) at a time + if (reading()) + return; + + // do not start read(2) if receiving has been halted + if (stoppedReceiving()) + return; + + // useless to try when there is no buffer space available + if (!maybeMakeSpaceAvailable()) + return; + + debugs(5, 5, tcp << ": reading... buffer space " << inBuf.spaceSize() << " bytes."); + + typedef CommCbMemFunT Dialer; + reader_ = JobCallback(33, 5, Dialer, this, AnyP::Agent::readHandler); + if (!maybeDelayRead(reader_)) + comm_read(tcp, inBuf.space(), inBuf.spaceSize(), reader_); +} + +void +AnyP::Agent::readHandler(const CommIoCbParams &io) +{ + debugs(5, 5, io.conn << " size " << io.size); + Must(reading()); + reader_ = NULL; + + /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ + if (io.flag == COMM_ERR_CLOSING) { + debugs(5, 5, io.conn << " closing Bailout."); + return; + } + + Must(Comm::IsConnOpen(tcp)); + Must(io.conn->fd == tcp->fd); + + /* NOTE: + * Don't reset the read timeout value here. + * The timeout value will be set to a specific config + * value which applies to each message as a whole, not + * individual read(2) calls. + * + * Plus, it breaks our lame half-close monitor detection + */ + + if (io.flag != COMM_OK) { + debugs(5, 2, tcp << ": got flag " << io.flag); + noteTransportReadError(io.xerrno); + io.conn->close(); + return; + } + + if (io.size < 0) { + if (!ignoreErrno(io.xerrno)) { + debugs(5, 2, tcp << " read failure: " << xstrerr(io.xerrno)); + noteTransportReadError(io.xerrno); + io.conn->close(); + return; + } else if (!inBuf.hasContent()) { + debugs(5, 2, tcp << ": no data to process (" << xstrerr(io.xerrno) << ")"); + } + + // grow the buffer if necessary, then read if there is space. + if (!maybeMakeSpaceAvailable()) { + stopReceiving("full read buffer - but processing does not free any space"); + // fall through to setup the half-closed monitoring + } else { + // schedule another read(2) + readSomeData(); + return; // wait for the results of this attempt. + } + + } else if (io.size > 0) { + updateByteCountersOnRead(io.size); + inBuf.append(io.buf, io.size); + + bool mayReadMore = true; + // pass handling on to child instance code + if (inBuf.hasContent()) + mayReadMore = processReadBuffer(inBuf); + // gro the buffer is necessary, after processing as much as possible out already + if (mayReadMore && !maybeMakeSpaceAvailable()) { + stopReceiving("full read buffer - but processing does not free any space"); + mayReadMore = false; + } + // schedule another read() - unless aborted by processing actions + if (mayReadMore) + readSomeData(); + + return; // everything is fine. stop. + + } else if (io.size == 0) { + debugs(5, 5, io.conn << " closed?"); + stopReceiving("zero sized read(2) result"); + // fall through to setup the half-closed monitoring + } + + // Ask the child class if it can stop immediately. + // It may still need to send via the connection, or + // to process any remainders in the buffer. + if (const char *reason = maybeFinishedWithTransport(inBuf)) { + stopSending(reason); // will close connection + return; + } + + // if already stopped sending, the above will close the connection + // avoid setting up monitoring on an already closed FD. + if (stoppedSending()) + return; + + /* It might be half-closed, we can't tell */ + fd_table[io.conn->fd].flags.socket_eof = true; + commMarkHalfClosed(io.conn->fd); + fd_note(io.conn->fd, "half-closed"); +} + +void +AnyP::Agent::sendSomeData(MemBuf &mb, AsyncCall::Pointer &callback) +{ + assert(!stoppedSending()); + + // TODO: allow writing multiple segments by queueing + Comm::Write(tcp, &mb, callback); +} + +/* This is a handler normally called by comm_close() */ +void +AnyP::Agent::handleConnectionClosed(const CommCloseCbParams &io) +{ + stopReceiving("TCP connection closed"); + stopSending("TCP connection closed"); +} === added file 'src/anyp/Agent.h' --- src/anyp/Agent.h 1970-01-01 00:00:00 +0000 +++ src/anyp/Agent.h 2014-02-28 21:06:43 +0000 @@ -0,0 +1,158 @@ +#ifndef SQUID_SRC_ANYP_AGENT_H +#define SQUID_SRC_ANYP_AGENT_H + +#include "base/AsyncCall.h" +#include "base/AsyncJob.h" +#include "comm/Connection.h" +#include "comm_err_t.h" +#include "MemBuf.h" + +class CommIoCbParams; +class CommCloseCbParams; + +namespace AnyP { + +/** + * Common base for classes reading (and writing) from (and to) a + * transport connection. Contains basic connection management code + * and complex algorithms for coordinating I/O activity with message + * receiving and sending states. + */ +class Agent : virtual public AsyncJob +{ +public: + /* API accessing read(2) */ + + /// Attempt to read some data. May do nothing. + /// Will call processReadBuffer() asynchronously when there is data to process. + /// May be called repeatedly without harmful side effects. + void readSomeData(); + + /// note receiving error and close as soon as we have done with writing as well + void stopReceiving(const char *error); + + /// true if we stopped receiving data + const char *stoppedReceiving() const { return stoppedReceiving_; } + + /* API accessing write(2) */ + + /// \see Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback) + void sendSomeData(MemBuf &mb, AsyncCall::Pointer &callback); + + /// note response sending error and close as soon as we read the request + void stopSending(const char *error); + + /// true if we stopped sending the response + const char *stoppedSending() const { return stoppedSending_; } + + /* API hacks that need to be removed by fixing the code that calls them */ + + /** Hack to cancel a read if one is scheduled, without blocking future socket use. + * \note Avoid using this method when possible. If the read(2) is done but + * AsyncCall is still queued the read(2) bytes will be lost permanently. + */ + void stopReadingXXX(); // XXX: export the Transport details to another Agent instead. + + // these following are Transport details used by Agents + // public instead of protected due to wide-ranging layer violations in client_side*.cc (at least) + + /// Pointer to the transport connection socket + Comm::ConnectionPointer tcp; + + /// received [transfer] protocol message bytes + /// kids parse this buffer to extract those messages + MemBuf inBuf; + +protected: + Agent(); + virtual ~Agent() {} + + // AsyncJob API + virtual bool doneAll() const; + virtual void swanSong(); + + /// initialize the connection event handlers + /// close(2) callback etc. + void connectionInit(const Comm::ConnectionPointer &c); + + /// releases connection event handlers without closing it + void releaseConnection(const char *reason); + + /// whether a read(2) operation is currently underway + bool reading() const {return reader_!=NULL;} + + /** Called when sending has stopped to check if more read(2)s may be required. + * + * \retval >0 Number of bytes expected still to arrive. + * \retval -1 More data still expected to arrive, unknown number of bytes at this time. + * \retval 0 No more bytes expected right now. + */ + virtual int64_t mayNeedToReadMore() const = 0; + + /// called when buffer may be used to receive new network data + bool maybeMakeSpaceAvailable(); + + /** + * Called before scheduling a read(2) operation in case the + * child class uses delay_pools to slow read(2) I/O down. + * \return true if this read has been deferred. + */ + // TODO: make the delaying part of Agents task + virtual bool maybeDelayRead(const AsyncCall::Pointer &call) {return false;} + + /** called when there is new buffered data to process. + * + * If the processing requires further read(2) to be halted temporarily it + * may return false. The processor is then responsible for ensuring that + * readSomeData() is called when read(2) calls are to be resumed. + * + * \retval true if additional read(2) should be scheduled by the caller. + * \retval false if read(2) is to be suspended. + */ + virtual bool processReadBuffer(MemBuf &) = 0; + + /** Called when there is an error performing read(2) + * so the child class can perform any cleanup or error handling. + * The connection will be closed immediately after this method + * completes. + */ + virtual void noteTransportReadError(int) {} + + /// Called when there has been a successful read(2). + /// The child class is responsible for data counting. + virtual void updateByteCountersOnRead(size_t) = 0; + + /// callback to handle read(2) input + void readHandler(const CommIoCbParams &io); + + /** + * called when 0-size read(2) occurs to ask the child class + * whether it is able to stop sending yet. + * + * There may also be unhandled data in the buffer passed to + * cleanup or make use of. No futher read(2) will be attempted. + * + * \return a reason for stopping I/O, + * or NULL to continue I/O with client half-closed. + */ + virtual const char * maybeFinishedWithTransport(MemBuf &) = 0; + +private: + void handleConnectionClosed(const CommCloseCbParams &io); + + /// the reason why we no longer read(2) or nil + const char *stoppedReceiving_; + + /// the reason why we no longer write(2) or nil + const char *stoppedSending_; + + /// callback to stop traffic processing when FD closes + AsyncCall::Pointer closed_; + + ///< set when we are reading + AsyncCall::Pointer reader_; +}; + +} // namespace Comm + +#endif /* SQUID_SRC_COMM_Agent_H */ === modified file 'src/anyp/Makefile.am' --- src/anyp/Makefile.am 2014-02-07 13:45:20 +0000 +++ src/anyp/Makefile.am 2014-02-28 13:10:17 +0000 @@ -4,6 +4,8 @@ noinst_LTLIBRARIES = libanyp.la libanyp_la_SOURCES = \ + Agent.cc \ + Agent.h \ forward.h \ PortCfg.cc \ PortCfg.h \ === modified file 'src/auth/UserRequest.cc' --- src/auth/UserRequest.cc 2014-02-20 01:50:39 +0000 +++ src/auth/UserRequest.cc 2014-02-28 12:58:36 +0000 @@ -354,7 +354,7 @@ if (*auth_user_request == NULL) { if (conn != NULL) { - debugs(29, 9, HERE << "This is a new checklist test on:" << conn->clientConnection); + debugs(29, 9, "This is a new checklist test on:" << conn->tcp); } if (proxy_auth && request->auth_user_request == NULL && conn != NULL && conn->getAuth() != NULL) { === modified file 'src/client_side.cc' --- src/client_side.cc 2014-02-21 10:46:19 +0000 +++ src/client_side.cc 2014-02-28 21:08:23 +0000 @@ -73,9 +73,9 @@ * data, or sending it. * \par - * ClientKeepAliveNextRequest will then detect the presence of data in - * the next ClientHttpRequest, and will send it, restablishing the - * data flow. + * ClientSocketContext::keepAliveNextRequest will then detect the presence + * of data in the next ClientHttpRequest, and will send it, restablishing + * the data flow. */ #include "squid.h" @@ -219,7 +219,6 @@ static void clientUpdateSocketStats(LogTags logType, size_t size); char *skipLeadingSpace(char *aString); -static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount); clientStreamNode * ClientSocketContext::getTail() const @@ -236,26 +235,6 @@ return (clientStreamNode *)http->client_stream.tail->prev->data; } -/** - * This routine should be called to grow the inbuf and then - * call comm_read(). - */ -void -ConnStateData::readSomeData() -{ - if (reading()) - return; - - debugs(33, 4, HERE << clientConnection << ": reading request..."); - - if (!maybeMakeSpaceAvailable()) - return; - - typedef CommCbMemFunT Dialer; - reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); - comm_read(clientConnection, in.addressToReadInto(), getAvailableBufferLength(), reader); -} - void ClientSocketContext::removeFromConnectionList(ConnStateData * conn) { @@ -367,7 +346,7 @@ AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg", CommIoCbPtrFun(&WroteControlMsg, this)); - Comm::Write(clientConnection, mb, call); + http->getConn()->sendSomeData(*mb, call); delete mb; } @@ -406,7 +385,7 @@ clientIdentDone(const char *ident, void *data) { ConnStateData *conn = (ConnStateData *)data; - xstrncpy(conn->clientConnection->rfc931, ident ? ident : dash_str, USER_IDENT_SZ); + xstrncpy(conn->tcp->rfc931, ident ? ident : dash_str, USER_IDENT_SZ); } #endif @@ -632,8 +611,8 @@ if (request) prepareLogWithRequestDetails(request, al); - if (getConn() != NULL && getConn()->clientConnection != NULL && getConn()->clientConnection->rfc931[0]) - al->cache.rfc931 = getConn()->clientConnection->rfc931; + if (getConn() != NULL && getConn()->tcp != NULL && getConn()->tcp->rfc931[0]) + al->cache.rfc931 = getConn()->tcp->rfc931; #if USE_SSL && 0 @@ -683,8 +662,8 @@ if (request) updateCounters(); - if (getConn() != NULL && getConn()->clientConnection != NULL) - clientdbUpdate(getConn()->clientConnection->remote, logType, AnyP::PROTO_HTTP, out.size); + if (getConn() != NULL && getConn()->tcp != NULL) + clientdbUpdate(getConn()->tcp->remote, logType, AnyP::PROTO_HTTP, out.size); } } @@ -740,17 +719,17 @@ /// propagates abort event to all contexts void -ConnStateData::notifyAllContexts(int xerrno) +ConnStateData::noteTransportReadError(int xerrno) { typedef ClientSocketContext::Pointer CSCP; for (CSCP c = getCurrentContext(); c.getRaw(); c = c->next) c->noteIoError(xerrno); } -/* This is a handler normally called by comm_close() */ -void ConnStateData::connStateClosed(const CommCloseCbParams &io) +void +ConnStateData::updateByteCountersOnRead(size_t sz) { - deleteThis("ConnStateData::connStateClosed"); + kb_incr(&(statCounter.client_http.kbytes_in), sz); } #if USE_AUTH @@ -759,7 +738,7 @@ { if (auth_ == NULL) { if (aur != NULL) { - debugs(33, 2, "Adding connection-auth to " << clientConnection << " from " << by); + debugs(33, 2, "Adding connection-auth to " << tcp << " from " << by); auth_ = aur; } return; @@ -768,7 +747,7 @@ // clobered with self-pointer // NP: something nasty is going on in Squid, but harmless. if (aur == auth_) { - debugs(33, 2, "WARNING: Ignoring duplicate connection-auth for " << clientConnection << " from " << by); + debugs(33, 2, "WARNING: Ignoring duplicate connection-auth for " << tcp << " from " << by); return; } @@ -809,7 +788,7 @@ // clobbered with nul-pointer if (aur == NULL) { - debugs(33, 2, "WARNING: Graceful closure on " << clientConnection << " due to connection-auth erase from " << by); + debugs(33, 2, "WARNING: Graceful closure on " << tcp << " due to connection-auth erase from " << by); auth_->releaseAuthServer(); auth_ = NULL; // XXX: need to test whether the connection re-auth challenge is sent. If not, how to trigger it from here. @@ -821,12 +800,12 @@ // clobbered with alternative credentials if (aur != auth_) { - debugs(33, 2, "ERROR: Closing " << clientConnection << " due to change of connection-auth from " << by); + debugs(33, 2, "ERROR: Closing " << tcp << " due to change of connection-auth from " << by); auth_->releaseAuthServer(); auth_ = NULL; // this is a fatal type of problem. // Close the connection immediately with TCP RST to abort all traffic flow - comm_reset_close(clientConnection); + comm_reset_close(tcp); return; } @@ -838,19 +817,17 @@ void ConnStateData::swanSong() { - debugs(33, 2, HERE << clientConnection); + debugs(33, 2, tcp); flags.readMore = false; - clientdbEstablished(clientConnection->remote, -1); /* decrement */ + clientdbEstablished(tcp->remote, -1); /* decrement */ assert(areAllContextsForThisConnection()); freeAllContexts(); unpinConnection(); - - if (Comm::IsConnOpen(clientConnection)) - clientConnection->close(); + AnyP::Agent::swanSong(); #if USE_AUTH - // NP: do this bit after closing the connections to avoid side effects from unwanted TCP RST + // NP: do this bit after AnyP::Agent::swanSong (connection cleanup) to avoid side effects from unwanted TCP RST setAuth(NULL, "ConnStateData::SwanSong cleanup"); #endif @@ -862,20 +839,20 @@ ConnStateData::isOpen() const { return cbdataReferenceValid(this) && // XXX: checking "this" in a method - Comm::IsConnOpen(clientConnection) && - !fd_table[clientConnection->fd].closing(); + Comm::IsConnOpen(tcp) && + !fd_table[tcp->fd].closing(); } ConnStateData::~ConnStateData() { assert(this != NULL); - debugs(33, 3, HERE << clientConnection); + debugs(33, 3, tcp); if (isOpen()) - debugs(33, DBG_IMPORTANT, "BUG: ConnStateData did not close " << clientConnection); + debugs(33, DBG_IMPORTANT, "BUG: ConnStateData did not close " << tcp); if (!flags.swanSang) - debugs(33, DBG_IMPORTANT, "BUG: ConnStateData was not destroyed properly; " << clientConnection); + debugs(33, DBG_IMPORTANT, "BUG: ConnStateData was not destroyed properly; " << tcp); cbdataReferenceDone(port); @@ -940,7 +917,7 @@ bool connIsUsable(ConnStateData * conn) { - if (conn == NULL || !cbdataReferenceValid(conn) || !Comm::IsConnOpen(conn->clientConnection)) + if (conn == NULL || !cbdataReferenceValid(conn) || !Comm::IsConnOpen(conn->tcp)) return false; return true; @@ -1060,7 +1037,7 @@ /* write */ AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); - Comm::Write(clientConnection, &mb, call); + http->getConn()->sendSomeData(mb, call); } else writeComplete(clientConnection, NULL, 0, COMM_OK); } @@ -1462,7 +1439,7 @@ debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete"); AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); - Comm::Write(clientConnection, mb, call); + http->getConn()->sendSomeData(*mb, call); delete mb; } @@ -1559,16 +1536,16 @@ void ConnStateData::readNextRequest() { - debugs(33, 5, HERE << clientConnection << " reading next req"); + debugs(33, 5, tcp << " reading next req"); - fd_note(clientConnection->fd, "Idle client: Waiting for next request"); + fd_note(tcp->fd, "Idle client: Waiting for next request"); /** - * Set the timeout BEFORE calling clientReadRequest(). + * Set the timeout BEFORE calling readSomeData(). */ typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(33, 5, TimeoutDialer, this, ConnStateData::requestTimeout); - commSetConnTimeout(clientConnection, Config.Timeout.clientIdlePconn, timeoutCall); + commSetConnTimeout(tcp, Config.Timeout.clientIdlePconn, timeoutCall); readSomeData(); /** Please don't do anything with the FD past here! */ @@ -1577,7 +1554,7 @@ static void ClientSocketContextPushDeferredIfNeeded(ClientSocketContext::Pointer deferredRequest, ConnStateData * conn) { - debugs(33, 2, HERE << conn->clientConnection << " Sending next"); + debugs(33, 2, conn->tcp << " Sending next"); /** If the client stream is waiting on a socket write to occur, then */ @@ -1602,15 +1579,18 @@ { ConnStateData * conn = http->getConn(); - debugs(33, 3, HERE << "ConnnStateData(" << conn->clientConnection << "), Context(" << clientConnection << ")"); + debugs(33, 3, "ConnStateData(" << conn->tcp << "), Context(" << clientConnection << ")"); connIsFinished(); if (conn->pinning.pinned && !Comm::IsConnOpen(conn->pinning.serverConnection)) { - debugs(33, 2, HERE << conn->clientConnection << " Connection was pinned but server side gone. Terminating client connection"); - conn->clientConnection->close(); + debugs(33, 2, conn->tcp << " Connection was pinned but server disconnected. Terminating client connection traffic"); + conn->stopReceiving("Connection was pinned but server disconnected"); + conn->stopSending("Connection was pinned but server disconnected"); return; } +#if 0 // keep sending responses until existing pipeline finished. + /** \par * We are done with the response, and we are either still receiving request * body (early response!) or have already stopped receiving anything. @@ -1625,11 +1605,14 @@ * getting stuck and to prevent accidental request smuggling. */ + // XXX: what if we stopped receiving after pipelined 10 requests and have 6 reply still to send ?? + // XXX: if the 10th request has Connection:close indicating no more to read(2) if (const char *reason = conn->stoppedReceiving()) { - debugs(33, 3, HERE << "closing for earlier request error: " << reason); - conn->clientConnection->close(); + debugs(33, 3, "closing for earlier request error: " << reason); + conn->tcp->close(); return; } +#endif /** \par * Attempt to parse a request from the request buffer. @@ -1640,9 +1623,8 @@ * This needs to fall through - if we're unlucky and parse the _last_ request * from our read buffer we may never re-register for another client read. */ - - if (conn->clientParseRequests()) { - debugs(33, 3, HERE << conn->clientConnection << ": parsed next request from buffer"); + if (!conn->stoppedReceiving() && conn->clientParseRequests()) { + debugs(33, 3, conn->tcp << ": parsed next request from buffer"); } /** \par @@ -1652,9 +1634,11 @@ * half-closed _AND_ then, sometimes, spending "Timeout" time in * the keepalive "Waiting for next request" state. */ - if (commIsHalfClosed(conn->clientConnection->fd) && (conn->getConcurrentRequestCount() == 0)) { - debugs(33, 3, "ClientSocketContext::keepaliveNextRequest: half-closed client with no pending requests, closing"); - conn->clientConnection->close(); + if (commIsHalfClosed(conn->tcp->fd) && (conn->getConcurrentRequestCount() == 0)) { + debugs(33, 3, "half-closed client with no pending requests, closing"); + conn->stopReceiving("half-closed client with no pending requests"); + conn->stopSending("half-closed client with no pending requests"); + conn->inBuf.reset(); // drop any buffer contents. nothing we can do with them now. return; } @@ -1669,14 +1653,16 @@ */ if ((deferredRequest = conn->getCurrentContext()).getRaw()) { - debugs(33, 3, HERE << conn->clientConnection << ": calling PushDeferredIfNeeded"); + debugs(33, 3, conn->tcp << ": calling PushDeferredIfNeeded"); ClientSocketContextPushDeferredIfNeeded(deferredRequest, conn); } else if (conn->flags.readMore) { - debugs(33, 3, HERE << conn->clientConnection << ": calling conn->readNextRequest()"); + debugs(33, 3, conn->tcp << ": calling conn->readNextRequest()"); conn->readNextRequest(); } else { // XXX: Can this happen? CONNECT tunnels have deferredRequest set. - debugs(33, DBG_IMPORTANT, HERE << "abandoning " << conn->clientConnection); + // The answer is yes. Probably because getRaw()==NULL when the Pointer is invalidated, + // for example when the tunnel context is completed and in the process of closing. + debugs(33, DBG_IMPORTANT, HERE << "abandoning " << conn->tcp); } } @@ -1869,30 +1855,6 @@ } void -ConnStateData::stopSending(const char *error) -{ - debugs(33, 4, HERE << "sending error (" << clientConnection << "): " << error << - "; old receiving error: " << - (stoppedReceiving() ? stoppedReceiving_ : "none")); - - if (const char *oldError = stoppedSending()) { - debugs(33, 3, HERE << "already stopped sending: " << oldError); - return; // nothing has changed as far as this connection is concerned - } - stoppedSending_ = error; - - if (!stoppedReceiving()) { - if (const int64_t expecting = mayNeedToReadMoreBody()) { - debugs(33, 5, HERE << "must still read " << expecting << - " request body bytes with " << in.notYetUsed << " unused"); - return; // wait for the request receiver to finish reading - } - } - - clientConnection->close(); -} - -void ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag) { const StoreEntry *entry = http->storeEntry(); @@ -1951,10 +1913,10 @@ ClientSocketContext *context; StoreIOBuffer tempBuffer; http = new ClientHttpRequest(csd); - http->req_sz = csd->in.notYetUsed; + http->req_sz = csd->inBuf.contentSize(); http->uri = xstrdup(uri); setLogUri (http, uri); - context = new ClientSocketContext(csd->clientConnection, http); + context = new ClientSocketContext(csd->tcp, http); tempBuffer.data = context->reqbuf; tempBuffer.length = HTTP_REQBUF_SZ; clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach, @@ -2091,7 +2053,7 @@ } if (vport < 0) - vport = http->getConn()->clientConnection->local.port(); + vport = http->getConn()->tcp->local.port(); const bool switchedToHttps = conn->switchedToHttps(); const bool tryHostHeader = vhost || switchedToHttps; @@ -2135,7 +2097,7 @@ /* Put the local socket IP address as the hostname, with whatever vport we found */ int url_sz = strlen(url) + 32 + Config.appendDomainLen; http->uri = (char *)xcalloc(url_sz, 1); - http->getConn()->clientConnection->local.toHostStr(ipbuf,MAX_IPSTRLEN); + http->getConn()->tcp->local.toHostStr(ipbuf,MAX_IPSTRLEN); snprintf(http->uri, url_sz, "%s://%s:%d%s", AnyP::UriScheme(conn->port->transport.protocol).c_str(), ipbuf, vport, url); @@ -2164,10 +2126,10 @@ /* Put the local socket IP address as the hostname. */ int url_sz = strlen(url) + 32 + Config.appendDomainLen; http->uri = (char *)xcalloc(url_sz, 1); - http->getConn()->clientConnection->local.toHostStr(ipbuf,MAX_IPSTRLEN); + http->getConn()->tcp->local.toHostStr(ipbuf,MAX_IPSTRLEN); snprintf(http->uri, url_sz, "%s://%s:%d%s", AnyP::UriScheme(http->getConn()->port->transport.protocol).c_str(), - ipbuf, http->getConn()->clientConnection->local.port(), url); + ipbuf, http->getConn()->tcp->local.port(), url); debugs(33, 5, "TRANSPARENT REWRITE: '" << http->uri << "'"); } } @@ -2295,7 +2257,7 @@ http = new ClientHttpRequest(csd); http->req_sz = HttpParserRequestLen(hp); - result = new ClientSocketContext(csd->clientConnection, http); + result = new ClientSocketContext(csd->tcp, http); tempBuffer.data = result->reqbuf; tempBuffer.length = HTTP_REQBUF_SZ; @@ -2366,7 +2328,7 @@ debugs(33, 5, "parseHttpRequest: Complete request received"); // XXX: crop this dump at the end of headers. No need for extras - debugs(11, 2, "HTTP Client " << csd->clientConnection); + debugs(11, 2, "HTTP Client " << csd->tcp); debugs(11, 2, "HTTP Client REQUEST:\n---------\n" << (hp->buf) + hp->req.m_start << "\n----------"); result->flags.parsed_ok = 1; @@ -2374,34 +2336,6 @@ return result; } -int -ConnStateData::getAvailableBufferLength() const -{ - assert (in.allocatedSize > in.notYetUsed); // allocated more than used - const size_t result = in.allocatedSize - in.notYetUsed - 1; - // huge request_header_max_size may lead to more than INT_MAX unused space - assert (static_cast(result) <= INT_MAX); - return result; -} - -bool -ConnStateData::maybeMakeSpaceAvailable() -{ - if (getAvailableBufferLength() < 2) { - size_t newSize; - if (in.allocatedSize >= Config.maxRequestBufferSize) { - debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize); - return false; - } - if ((newSize=in.allocatedSize * 2) > Config.maxRequestBufferSize) { - newSize=Config.maxRequestBufferSize; - } - in.buf = (char *)memReallocBuf(in.buf, newSize, &in.allocatedSize); - debugs(33, 2, "growing request buffer: notYetUsed=" << in.notYetUsed << " size=" << in.allocatedSize); - } - return true; -} - void ConnStateData::addContextToQueue(ClientSocketContext * context) { @@ -2425,68 +2359,39 @@ return result; } -int -ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno) -{ - if (flag != COMM_OK) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": got flag " << flag); - return 1; - } - - if (size < 0) { - if (!ignoreErrno(xerrno)) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno)); - return 1; - } else if (in.notYetUsed == 0) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")"); - } - } - - return 0; -} - -int -ConnStateData::connFinishedWithConn(int size) -{ - if (size == 0) { - if (getConcurrentRequestCount() == 0 && in.notYetUsed == 0) { - /* no current or pending requests */ - debugs(33, 4, HERE << clientConnection << " closed"); - return 1; - } else if (!Config.onoff.half_closed_clients) { - /* admin doesn't want to support half-closed client sockets */ - debugs(33, 3, HERE << clientConnection << " aborted (half_closed_clients disabled)"); - notifyAllContexts(0); // no specific error implies abort - return 1; - } - } - - return 0; -} - -void -connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount) -{ - assert(byteCount > 0 && byteCount <= conn->in.notYetUsed); - conn->in.notYetUsed -= byteCount; - debugs(33, 5, HERE << "conn->in.notYetUsed = " << conn->in.notYetUsed); - /* - * If there is still data that will be used, - * move it to the beginning. - */ - - if (conn->in.notYetUsed > 0) - memmove(conn->in.buf, conn->in.buf + byteCount, conn->in.notYetUsed); +const char * +ConnStateData::maybeFinishedWithTransport(MemBuf &aBuf) +{ + if (getConcurrentRequestCount() == 0 && !aBuf.hasContent()) { + /* no current or pending requests */ + debugs(33, 4, tcp << " closed"); + return "done"; + } else if (!Config.onoff.half_closed_clients) { + /* admin doesn't want to support half-closed client sockets */ + debugs(33, 3, tcp << " aborted (half_closed_clients disabled)"); + noteTransportReadError(0); // no specific error implies abort all pending requests + aBuf.clean(); // drop any unhandled buffer contents as well + return "half_closed_clients disabled"; + } else if (getConcurrentRequestCount() == 0) + aBuf.clean(); // drop any unhandled buffer contents as well + + // XXX: we may have bytes in the buffer still. But due to parser leaving the + // active request(s) in buffer until any potential body is processed we cannot + // clear the buffer while getConcurrentRequestCount() > 0. + // So for now it is done in keepaliveNextRequest() instead, + // after checking half-closed and request-count status + + return NULL; } /// respond with ERR_TOO_BIG if request header exceeds request_header_max_size void ConnStateData::checkHeaderLimits() { - if (in.notYetUsed < Config.maxRequestHeaderSize) + if (inBuf.contentSize() < static_cast(Config.maxRequestHeaderSize)) return; // can accumulte more header data - debugs(33, 3, "Request header is too large (" << in.notYetUsed << " > " << + debugs(33, 3, "Request header is too large (" << inBuf.contentSize() << " > " << Config.maxRequestHeaderSize << " bytes)"); ClientSocketContext *context = parseHttpRequestAbort(this, "error:request-too-large"); @@ -2495,26 +2400,12 @@ assert (repContext); repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, Http::METHOD_NONE, NULL, - clientConnection->remote, NULL, NULL, NULL); + tcp->remote, NULL, NULL, NULL); context->registerWithConn(); context->pullData(); } void -ConnStateData::clientAfterReadingRequests() -{ - // Were we expecting to read more request body from half-closed connection? - if (mayNeedToReadMoreBody() && commIsHalfClosed(clientConnection->fd)) { - debugs(33, 3, HERE << "truncated body: closing half-closed " << clientConnection); - clientConnection->close(); - return; - } - - if (flags.readMore) - readSomeData(); -} - -void ConnStateData::quitAfterError(HttpRequest *request) { // From HTTP p.o.v., we do not have to close after every error detected @@ -2523,7 +2414,7 @@ if (request) request->flags.proxyKeepalive = false; flags.readMore = false; - debugs(33,4, HERE << "Will close after error: " << clientConnection); + debugs(33,4, "Will close after error: " << tcp); } #if USE_SSL @@ -2590,7 +2481,7 @@ // Create an error object and fill it ErrorState *err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request); - err->src_addr = clientConnection->remote; + err->src_addr = tcp->remote; Ssl::ErrorDetail *errDetail = new Ssl::ErrorDetail( SQUID_X509_V_ERR_DOMAIN_MISMATCH, srvCert, NULL); @@ -2617,7 +2508,7 @@ { ClientHttpRequest *http = context->http; HttpRequest::Pointer request; - bool notedUseOfBuffer = false; + bool reqConsumedFromBuffer = false; bool chunked = false; bool mustReplyToOptions = false; bool unsupportedTe = false; @@ -2637,15 +2528,15 @@ assert (repContext); switch (hp->request_parse_status) { case Http::scHeaderTooLarge: - repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf, NULL); + repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->tcp->remote, NULL, conn->inBuf.content(), NULL); break; case Http::scMethodNotAllowed: repContext->setReplyToError(ERR_UNSUP_REQ, Http::scMethodNotAllowed, method, http->uri, - conn->clientConnection->remote, NULL, conn->in.buf, NULL); + conn->tcp->remote, NULL, conn->inBuf.content(), NULL); break; default: repContext->setReplyToError(ERR_INVALID_REQ, hp->request_parse_status, method, http->uri, - conn->clientConnection->remote, NULL, conn->in.buf, NULL); + conn->tcp->remote, NULL, conn->inBuf.content(), NULL); } assert(context->http->out.offset == 0); context->pullData(); @@ -2660,7 +2551,7 @@ setLogUri(http, http->uri, true); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); - repContext->setReplyToError(ERR_INVALID_URL, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, NULL, NULL); + repContext->setReplyToError(ERR_INVALID_URL, Http::scBadRequest, method, http->uri, conn->tcp->remote, NULL, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; @@ -2679,7 +2570,7 @@ clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_UNSUP_HTTPVERSION, Http::scHttpVersionNotSupported, method, http->uri, - conn->clientConnection->remote, NULL, HttpParserHdrBuf(hp), NULL); + conn->tcp->remote, NULL, HttpParserHdrBuf(hp), NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; @@ -2696,7 +2587,7 @@ setLogUri(http, http->uri, true); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); - repContext->setReplyToError(ERR_INVALID_REQ, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, NULL, NULL); + repContext->setReplyToError(ERR_INVALID_REQ, Http::scBadRequest, method, http->uri, conn->tcp->remote, NULL, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; @@ -2753,13 +2644,13 @@ request->flags.internal = http->flags.internal; setLogUri (http, urlCanonicalClean(request.getRaw())); - request->client_addr = conn->clientConnection->remote; // XXX: remove reuest->client_addr member. + request->client_addr = conn->tcp->remote; // XXX: remove reuest->client_addr member. #if FOLLOW_X_FORWARDED_FOR // indirect client gets stored here because it is an HTTP header result (from X-Forwarded-For:) // not a details about teh TCP connection itself - request->indirect_client_addr = conn->clientConnection->remote; + request->indirect_client_addr = conn->tcp->remote; #endif /* FOLLOW_X_FORWARDED_FOR */ - request->my_addr = conn->clientConnection->local; + request->my_addr = conn->tcp->local; request->myportname = conn->port->name; request->http_ver = http_ver; @@ -2783,7 +2674,7 @@ clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_UNSUP_REQ, Http::scNotImplemented, request->method, NULL, - conn->clientConnection->remote, request.getRaw(), NULL, NULL); + conn->tcp->remote, request.getRaw(), NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; @@ -2796,7 +2687,7 @@ conn->quitAfterError(request.getRaw()); repContext->setReplyToError(ERR_INVALID_REQ, Http::scLengthRequired, request->method, NULL, - conn->clientConnection->remote, request.getRaw(), NULL, NULL); + conn->tcp->remote, request.getRaw(), NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; @@ -2811,7 +2702,7 @@ assert (repContext); conn->quitAfterError(request.getRaw()); repContext->setReplyToError(ERR_INVALID_REQ, Http::scExpectationFailed, request->method, http->uri, - conn->clientConnection->remote, request.getRaw(), NULL, NULL); + conn->tcp->remote, request.getRaw(), NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; @@ -2840,8 +2731,8 @@ chunked ? -1 : request->content_length); // consume header early so that body pipe gets just the body - connNoteUseOfBuffer(conn, http->req_sz); - notedUseOfBuffer = true; + conn->inBuf.consume(http->req_sz); + reqConsumedFromBuffer = true; /* Is it too large? */ if (!chunked && // if chunked, we will check as we accumulate @@ -2852,7 +2743,7 @@ conn->quitAfterError(request.getRaw()); repContext->setReplyToError(ERR_TOO_BIG, Http::scRequestEntityTooLarge, Http::METHOD_NONE, NULL, - conn->clientConnection->remote, http->request, NULL, NULL); + conn->tcp->remote, http->request, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; @@ -2860,7 +2751,7 @@ // We may stop producing, comm_close, and/or call setReplyToError() // below, so quit on errors to avoid http->doCallouts() - if (!conn->handleRequestBodyData()) + if (!conn->processRequestBodyData(conn->inBuf)) goto finish; if (!request->body_pipe->productionEnded()) { @@ -2875,29 +2766,13 @@ http->doCallouts(); finish: - if (!notedUseOfBuffer) - connNoteUseOfBuffer(conn, http->req_sz); + if (!reqConsumedFromBuffer) + conn->inBuf.consume(http->req_sz); - /* - * DPW 2007-05-18 - * Moved the TCP_RESET feature from clientReplyContext::sendMoreData - * to here because calling comm_reset_close() causes http to - * be freed and the above connNoteUseOfBuffer() would hit an - * assertion, not to mention that we were accessing freed memory. - */ - if (request != NULL && request->flags.resetTcp && Comm::IsConnOpen(conn->clientConnection)) { - debugs(33, 3, HERE << "Sending TCP RST on " << conn->clientConnection); + if (request != NULL && request->flags.resetTcp && Comm::IsConnOpen(conn->tcp)) { + debugs(33, 3, "Sending TCP RST on " << conn->tcp); conn->flags.readMore = false; - comm_reset_close(conn->clientConnection); - } -} - -static void -connStripBufferWhitespace (ConnStateData * conn) -{ - while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) { - memmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1); - -- conn->in.notYetUsed; + comm_reset_close(conn->tcp); // may free 'http' } } @@ -2917,8 +2792,8 @@ // when queue filled already we cant add more. if (existingRequestCount >= concurrentRequestLimit) { - debugs(33, 3, clientConnection << " max concurrent requests reached (" << concurrentRequestLimit << ")"); - debugs(33, 5, clientConnection << " deferring new request until one is done"); + debugs(33, 3, tcp << " max concurrent requests reached (" << concurrentRequestLimit << ")"); + debugs(33, 5, tcp << " deferring new request until one is done"); return true; } @@ -2936,28 +2811,24 @@ HttpRequestMethod method; bool parsed_req = false; - debugs(33, 5, HERE << clientConnection << ": attempting to parse"); + debugs(33, 5, tcp << ": attempting to parse ..."); // Loop while we have read bytes that are not needed for producing the body // On errors, bodyPipe may become nil, but readMore will be cleared - while (in.notYetUsed > 0 && !bodyPipe && flags.readMore) { - connStripBufferWhitespace(this); + while (inBuf.hasContent() && !bodyPipe && flags.readMore) { + inBuf.consumeWhitespacePrefix(); /* Don't try to parse if the buffer is empty */ - if (in.notYetUsed == 0) + if (!inBuf.hasContent()) break; /* Limit the number of concurrent requests */ if (concurrentRequestQueueFilled()) break; - /* Should not be needed anymore */ - /* Terminate the string */ - in.buf[in.notYetUsed] = '\0'; - /* Begin the parsing */ PROF_start(parseHttpRequest); - HttpParserInit(&parser_, in.buf, in.notYetUsed); + HttpParserInit(&parser_, inBuf.content(), inBuf.contentSize()); // XXX: pass MemBuf & /* Process request */ Http::ProtocolVersion http_ver; @@ -2974,10 +2845,10 @@ /* status -1 or 1 */ if (context) { - debugs(33, 5, HERE << clientConnection << ": parsed a request"); + debugs(33, 5, tcp << ": parsed a request"); AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "clientLifetimeTimeout", CommTimeoutCbPtrFun(clientLifetimeTimeout, context->http)); - commSetConnTimeout(clientConnection, Config.Timeout.lifetime, timeoutCall); + commSetConnTimeout(tcp, Config.Timeout.lifetime, timeoutCall); clientProcessRequest(this, &parser_, context, method, http_ver); @@ -2994,141 +2865,68 @@ return parsed_req; } -void -ConnStateData::clientReadRequest(const CommIoCbParams &io) +/** + * called when new request message data has been buffered in inBuf + * may close the connection if we were closing and piped everything out + * + * \return false when read() needs to be abandoned + */ +bool +ConnStateData::processRequestMessageData(MemBuf &aBuf) { - debugs(33,5,HERE << io.conn << " size " << io.size); - Must(reading()); - reader = NULL; - - /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ - - if (io.flag == COMM_ERR_CLOSING) { - debugs(33,5, HERE << io.conn << " closing Bailout."); - return; - } - - assert(Comm::IsConnOpen(clientConnection)); - assert(io.conn->fd == clientConnection->fd); - - /* - * Don't reset the timeout value here. The timeout value will be - * set to Config.Timeout.request by httpAccept() and - * clientWriteComplete(), and should apply to the request as a - * whole, not individual read() calls. Plus, it breaks our - * lame half-close detection - */ - if (connReadWasError(io.flag, io.size, io.xerrno)) { - notifyAllContexts(io.xerrno); - io.conn->close(); - return; - } - - if (io.flag == COMM_OK) { - if (io.size > 0) { - kb_incr(&(statCounter.client_http.kbytes_in), io.size); - - // may comm_close or setReplyToError - if (!handleReadData(io.buf, io.size)) - return; - - } else if (io.size == 0) { - debugs(33, 5, HERE << io.conn << " closed?"); - - if (connFinishedWithConn(io.size)) { - clientConnection->close(); - return; - } - - /* It might be half-closed, we can't tell */ - fd_table[io.conn->fd].flags.socket_eof = true; - - commMarkHalfClosed(io.conn->fd); - - fd_note(io.conn->fd, "half-closed"); - - /* There is one more close check at the end, to detect aborted - * (partial) requests. At this point we can't tell if the request - * is partial. - */ - /* Continue to process previously read data */ - } - } - /* Process next request */ - if (getConcurrentRequestCount() == 0) - fd_note(io.fd, "Reading next request"); + fd_note(tcp->fd, "Reading next request"); if (!clientParseRequests()) { - if (!isOpen()) - return; - /* - * If the client here is half closed and we failed - * to parse a request, close the connection. - * The above check with connFinishedWithConn() only - * succeeds _if_ the buffer is empty which it won't - * be if we have an incomplete request. - * XXX: This duplicates ClientSocketContext::keepaliveNextRequest - */ - if (getConcurrentRequestCount() == 0 && commIsHalfClosed(io.fd)) { - debugs(33, 5, HERE << io.conn << ": half-closed connection, no completed request parsed, connection closing."); - clientConnection->close(); - return; + if (stoppedReceiving() && getConcurrentRequestCount() == 0) { + // all outstanding requests done. no more requests able to come in. + debugs(33, 5, tcp << ": half-closed connection, no completed request parsed, connection closing."); + stopSending("no more requests to happen. Abandon incomplete request."); + return false; } } + // XXX: should not be needed anymore if (!isOpen()) - return; + return false; - clientAfterReadingRequests(); + // when readMore is false we abort reading, even if the socket is good + return flags.readMore; } -/** - * called when new request data has been read from the socket - * - * \retval false called comm_close or setReplyToError (the caller should bail) - * \retval true we did not call comm_close or setReplyToError - */ bool -ConnStateData::handleReadData(char *buf, size_t size) +ConnStateData::processReadBuffer(MemBuf &aBuf) { - char *current_buf = in.addressToReadInto(); - - if (buf != current_buf) - memmove(current_buf, buf, size); - - in.notYetUsed += size; - - in.buf[in.notYetUsed] = '\0'; /* Terminate the string */ - // if we are reading a body, stuff data into the body pipe if (bodyPipe != NULL) - return handleRequestBodyData(); - return true; + return processRequestBodyData(aBuf); + + // if we are expecting a message frame, try to parse + return processRequestMessageData(aBuf); } /** - * called when new request body data has been buffered in in.buf + * called when new request body data has been buffered in inBuf * may close the connection if we were closing and piped everything out * * \retval false called comm_close or setReplyToError (the caller should bail) * \retval true we did not call comm_close or setReplyToError */ bool -ConnStateData::handleRequestBodyData() +ConnStateData::processRequestBodyData(MemBuf &aBuf) { assert(bodyPipe != NULL); size_t putSize = 0; - if (in.bodyParser) { // chunked encoding + if (bodyParser_) { // chunked encoding if (const err_type error = handleChunkedRequestBody(putSize)) { abortChunkedRequestBody(error); return false; } } else { // identity encoding - debugs(33,5, HERE << "handling plain request body for " << clientConnection); - putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed); + debugs(33,5, "handling plain request body for " << tcp); + putSize = bodyPipe->putMoreData(aBuf.content(), aBuf.contentSize()); if (!bodyPipe->mayNeedMoreData()) { // BodyPipe will clear us automagically when we produced everything bodyPipe = NULL; @@ -3136,17 +2934,17 @@ } if (putSize > 0) - connNoteUseOfBuffer(this, putSize); + aBuf.consume(putSize); if (!bodyPipe) { - debugs(33,5, HERE << "produced entire request body for " << clientConnection); + debugs(33,5, "produced entire request body for " << tcp); if (const char *reason = stoppedSending()) { /* we've finished reading like good clients, * now do the close that initiateClose initiated. */ debugs(33, 3, HERE << "closing for earlier sending error: " << reason); - clientConnection->close(); + tcp->close(); return false; } } @@ -3158,23 +2956,18 @@ err_type ConnStateData::handleChunkedRequestBody(size_t &putSize) { - debugs(33,7, HERE << "chunked from " << clientConnection << ": " << in.notYetUsed); + debugs(33,7, "chunked from " << tcp << ": " << inBuf.contentSize()); try { // the parser will throw on errors - if (!in.notYetUsed) // nothing to do (MemBuf::init requires this check) + if (!inBuf.hasContent()) return ERR_NONE; - MemBuf raw; // ChunkedCodingParser only works with MemBufs - // add one because MemBuf will assert if it cannot 0-terminate - raw.init(in.notYetUsed, in.notYetUsed+1); - raw.append(in.buf, in.notYetUsed); - - const mb_size_t wasContentSize = raw.contentSize(); + const mb_size_t wasContentSize = inBuf.contentSize(); BodyPipeCheckout bpc(*bodyPipe); - const bool parsed = in.bodyParser->parse(&raw, &bpc.buf); + const bool parsed = bodyParser_->parse(&inBuf, &bpc.buf); bpc.checkIn(); - putSize = wasContentSize - raw.contentSize(); + putSize = wasContentSize - inBuf.contentSize(); // dechunk then check: the size limit applies to _dechunked_ content if (clientIsRequestBodyTooLargeForPolicy(bodyPipe->producedSize())) @@ -3187,10 +2980,10 @@ } // if chunk parser needs data, then the body pipe must need it too - Must(!in.bodyParser->needsMoreData() || bodyPipe->mayNeedMoreData()); + Must(!bodyParser_->needsMoreData() || bodyPipe->mayNeedMoreData()); // if parser needs more space and we can consume nothing, we will stall - Must(!in.bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent()); + Must(!bodyParser_->needsMoreSpace() || bodyPipe->buf().hasContent()); } catch (...) { // TODO: be more specific debugs(33, 3, HERE << "malformed chunks" << bodyPipe->status()); return ERR_INVALID_REQ; @@ -3222,15 +3015,15 @@ repContext->http->uri, CachePeer, repContext->http->request, - in.buf, NULL); + inBuf.content(), NULL); context->pullData(); } else { // close or otherwise we may get stuck as nobody will notice the error? - comm_reset_close(clientConnection); + comm_reset_close(tcp); } #else debugs(33, 3, HERE << "aborting chunked request without error " << error); - comm_reset_close(clientConnection); + comm_reset_close(tcp); #endif flags.readMore = false; } @@ -3238,14 +3031,8 @@ void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer ) { - if (!handleRequestBodyData()) - return; - - // too late to read more body - if (!isOpen() || stoppedReceiving()) - return; - - readSomeData(); + if (processReadBuffer(inBuf)) + readSomeData(); } void @@ -3287,13 +3074,13 @@ ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) : AsyncJob("ConnStateData"), + AnyP::Agent(), #if USE_SSL sslBumpMode(Ssl::bumpEnd), switchedToHttps_(false), sslServerBump(NULL), #endif - stoppedSending_(NULL), - stoppedReceiving_(NULL) + bodyParser_(NULL) { pinning.host = NULL; pinning.port = -1; @@ -3303,19 +3090,20 @@ pinning.peer = NULL; // store the details required for creating more MasterXaction objects as new requests come in - clientConnection = xact->tcpClient; port = cbdataReference(xact->squidPort.get()); log_addr = xact->tcpClient->remote; log_addr.applyMask(Config.Addrs.client_netmask); - in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &in.allocatedSize); + connectionInit(xact->tcpClient); + inBuf.init(CLIENT_REQ_BUF_SZ, Config.maxRequestBufferSize); + debugs(33, 8, "inBuf space=" << inBuf.spaceSize()); if (port->disable_pmtu_discovery != DISABLE_PMTU_OFF && (transparent() || port->disable_pmtu_discovery == DISABLE_PMTU_ALWAYS)) { #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DONT) int i = IP_PMTUDISC_DONT; - if (setsockopt(clientConnection->fd, SOL_IP, IP_MTU_DISCOVER, &i, sizeof(i)) < 0) - debugs(33, 2, "WARNING: Path MTU discovery disabling failed on " << clientConnection << " : " << xstrerror()); + if (setsockopt(tcp->fd, SOL_IP, IP_MTU_DISCOVER, &i, sizeof(i)) < 0) + debugs(33, 2, "WARNING: Path MTU discovery disabling failed on " << tcp << " : " << xstrerror()); #else static bool reported = false; @@ -3326,12 +3114,8 @@ #endif } - typedef CommCbMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, ConnStateData::connStateClosed); - comm_add_close_handler(clientConnection->fd, call); - if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(clientConnection->remote, FQDN_LOOKUP_IF_MISS); + fqdncache_gethostbyaddr(tcp->remote, FQDN_LOOKUP_IF_MISS); #if USE_IDENT if (Ident::TheConfig.identLookup) { @@ -3343,7 +3127,7 @@ } #endif - clientdbEstablished(clientConnection->remote, 1); + clientdbEstablished(tcp->remote, 1); flags.readMore = true; } @@ -3584,7 +3368,7 @@ { SSL *ssl = NULL; assert(connState); - const Comm::ConnectionPointer &details = connState->clientConnection; + const Comm::ConnectionPointer &details = connState->tcp; if (sslContext && !(ssl = httpsCreate(details, sslContext))) return; @@ -3603,13 +3387,13 @@ fakeRequest->SetHost(details->local.toStr(buf, sizeof(buf))); fakeRequest->port = details->local.port(); fakeRequest->clientConnectionManager = connState; - fakeRequest->client_addr = connState->clientConnection->remote; + fakeRequest->client_addr = connState->tcp->remote; #if FOLLOW_X_FORWARDED_FOR - fakeRequest->indirect_client_addr = connState->clientConnection->remote; + fakeRequest->indirect_client_addr = connState->tcp->remote; #endif - fakeRequest->my_addr = connState->clientConnection->local; - fakeRequest->flags.interceptTproxy = ((connState->clientConnection->flags & COMM_TRANSPARENT) != 0 ) ; - fakeRequest->flags.intercepted = ((connState->clientConnection->flags & COMM_INTERCEPTION) != 0); + fakeRequest->my_addr = connState->tcp->local; + fakeRequest->flags.interceptTproxy = ((connState->tcp->flags & COMM_TRANSPARENT) != 0 ) ; + fakeRequest->flags.intercepted = ((connState->tcp->flags & COMM_INTERCEPTION) != 0); fakeRequest->myportname = connState->port->name; if (fakeRequest->flags.interceptTproxy) { if (Config.accessList.spoof_client_ip) { @@ -3624,6 +3408,24 @@ } } +bool +ConnStateData::injectPrefixBytesXXX(const MemBuf &pfx) +{ + if (inBuf.hasContent()) { + const mb_size_t sz = max(pfx.contentSize()+inBuf.contentSize(), inBuf.capacity); + static MemBuf tmp; + tmp.init(sz, sz); + tmp.append(pfx.content(), pfx.contentSize()); + tmp.append(inBuf.content(), inBuf.contentSize()); + inBuf.reset(); + inBuf.append(tmp.content(), tmp.contentSize()); + } + else + inBuf.append(pfx.content(), pfx.contentSize()); + + return true; +} + /** * A callback function to use with the ACLFilledChecklist callback. * In the case of ACCESS_ALLOWED answer initializes a bumped SSL connection, @@ -3641,25 +3443,24 @@ // Require both a match and a positive bump mode to work around exceptional // cases where ACL code may return ACCESS_ALLOWED with zero answer.kind. if (answer == ACCESS_ALLOWED && answer.kind != Ssl::bumpNone) { - debugs(33, 2, HERE << "sslBump needed for " << connState->clientConnection); + debugs(33, 2, "sslBump needed for " << connState->tcp); connState->sslBumpMode = static_cast(answer.kind); httpsEstablish(connState, NULL, (Ssl::BumpMode)answer.kind); } else { - debugs(33, 2, HERE << "sslBump not needed for " << connState->clientConnection); + debugs(33, 2, "sslBump not needed for " << connState->tcp); connState->sslBumpMode = Ssl::bumpNone; // fake a CONNECT request to force connState to tunnel + // XXX: should be doing explicit state setup instead of parsing + // XXX: this abuse is also forcing ConnStateData::processReadBuffer() to be public. static char ip[MAX_IPSTRLEN]; - static char reqStr[MAX_IPSTRLEN + 80]; - connState->clientConnection->local.toUrl(ip, sizeof(ip)); - snprintf(reqStr, sizeof(reqStr), "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", ip, ip); - bool ret = connState->handleReadData(reqStr, strlen(reqStr)); - if (ret) - ret = connState->clientParseRequests(); - - if (!ret) { - debugs(33, 2, HERE << "Failed to start fake CONNECT request for ssl bumped connection: " << connState->clientConnection); - connState->clientConnection->close(); + connState->tcp->local.toUrl(ip, sizeof(ip)); + MemBuf fake; + fake.Printf("CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", ip, ip); + if (!connState->injectPrefixBytesXXX(fake) || !connState->processReadBuffer(connState->inBuf)) { + debugs(33, 2, "Failed to start fake CONNECT request for ssl bumped connection: " << connState->tcp); + connState->stopReceiving("fake CONNECT request for ssl bump failed"); + connState->stopSending("fake CONNECT request for ssl bump failed"); } } } @@ -3779,7 +3580,7 @@ certProperties.mimicCert.resetAndLock(mimicCert); ACLFilledChecklist checklist(NULL, sslServerBump->request.getRaw(), - clientConnection != NULL ? clientConnection->rfc931 : dash_str); + tcp != NULL ? tcp->rfc931 : dash_str); checklist.sslErrors = cbdataReference(sslServerBump->sslErrors); for (sslproxy_cert_adapt *ca = Config.ssl_client.cert_adapt; ca != NULL; ca = ca->next) { @@ -3924,7 +3725,7 @@ if (sslContext) { if (!ssl_ctx_cache || !ssl_ctx_cache->add(sslBumpCertKey.termedBuf(), new Ssl::SSL_CTX_Pointer(sslContext))) { // If it is not in storage delete after using. Else storage deleted it. - fd_table[clientConnection->fd].dynamicSslContext = sslContext; + fd_table[tcp->fd].dynamicSslContext = sslContext; } } else { debugs(33, 2, HERE << "Failed to generate SSL cert for " << sslConnectHostOrIp); @@ -3934,8 +3735,8 @@ // If generated ssl context = NULL, try to use static ssl context. if (!sslContext) { if (!port->staticSslContext) { - debugs(83, DBG_IMPORTANT, "Closing SSL " << clientConnection->remote << " as lacking SSL context"); - clientConnection->close(); + debugs(83, DBG_IMPORTANT, "Closing SSL " << tcp->remote << " as lacking SSL context"); + tcp->close(); return; } else { debugs(33, 5, HERE << "Using static ssl context."); @@ -3943,14 +3744,14 @@ } } - if (!httpsCreate(clientConnection, sslContext)) + if (!httpsCreate(tcp, sslContext)) return; // commSetConnTimeout() was called for this request before we switched. // Disable the client read handler until CachePeer selection is complete - Comm::SetSelect(clientConnection->fd, COMM_SELECT_READ, NULL, NULL, 0); - Comm::SetSelect(clientConnection->fd, COMM_SELECT_READ, clientNegotiateSSL, this, 0); + Comm::SetSelect(tcp->fd, COMM_SELECT_READ, NULL, NULL, 0); + Comm::SetSelect(tcp->fd, COMM_SELECT_READ, clientNegotiateSSL, this, 0); switchedToHttps_ = true; } @@ -3964,7 +3765,7 @@ // We are going to read new request flags.readMore = true; - debugs(33, 5, HERE << "converting " << clientConnection << " to SSL"); + debugs(33, 5, "converting " << tcp << " to SSL"); // If sslServerBump is set, then we have decided to deny CONNECT // and now want to switch to SSL to send the error to the client @@ -3974,7 +3775,7 @@ sslServerBump = new Ssl::ServerBump(request); // will call httpsPeeked() with certificate and connection, eventually - FwdState::fwdStart(clientConnection, sslServerBump->entry, sslServerBump->request.getRaw()); + FwdState::fwdStart(tcp, sslServerBump->entry, sslServerBump->request.getRaw()); return; } @@ -4291,7 +4092,7 @@ { ConnStateData * conn = http->getConn(); ACLFilledChecklist *ch = new ACLFilledChecklist(acl, http->request, - cbdataReferenceValid(conn) && conn != NULL && conn->clientConnection != NULL ? conn->clientConnection->rfc931 : dash_str); + cbdataReferenceValid(conn) && conn != NULL && conn->tcp != NULL ? conn->tcp->rfc931 : dash_str); ch->al = http->al; /* * hack for ident ACL. It needs to get full addresses, and a place to store @@ -4306,22 +4107,7 @@ bool ConnStateData::transparent() const { - return clientConnection != NULL && (clientConnection->flags & (COMM_TRANSPARENT|COMM_INTERCEPTION)); -} - -bool -ConnStateData::reading() const -{ - return reader != NULL; -} - -void -ConnStateData::stopReading() -{ - if (reading()) { - comm_read_cancel(clientConnection->fd, reader); - reader = NULL; - } + return tcp != NULL && (tcp->flags & (COMM_TRANSPARENT|COMM_INTERCEPTION)); } BodyPipe::Pointer @@ -4336,7 +4122,7 @@ } int64_t -ConnStateData::mayNeedToReadMoreBody() const +ConnStateData::mayNeedToReadMore() const { if (!bodyPipe) return 0; // request without a body or read/produced all body bytes @@ -4345,7 +4131,7 @@ return -1; // probably need to read more, but we cannot be sure const int64_t needToProduce = bodyPipe->unproducedSize(); - const int64_t haveAvailable = static_cast(in.notYetUsed); + const int64_t haveAvailable = static_cast(inBuf.contentSize()); if (needToProduce <= haveAvailable) return 0; // we have read what we need (but are waiting for pipe space) @@ -4354,26 +4140,6 @@ } void -ConnStateData::stopReceiving(const char *error) -{ - debugs(33, 4, HERE << "receiving error (" << clientConnection << "): " << error << - "; old sending error: " << - (stoppedSending() ? stoppedSending_ : "none")); - - if (const char *oldError = stoppedReceiving()) { - debugs(33, 3, HERE << "already stopped receiving: " << oldError); - return; // nothing has changed as far as this connection is concerned - } - - stoppedReceiving_ = error; - - if (const char *sendError = stoppedSending()) { - debugs(33, 3, HERE << "closing because also stopped sending: " << sendError); - clientConnection->close(); - } -} - -void ConnStateData::expectNoForwarding() { if (bodyPipe != NULL) { @@ -4388,8 +4154,8 @@ { Must(bodyPipe != NULL); debugs(33, 5, HERE << "start dechunking" << bodyPipe->status()); - assert(!in.bodyParser); - in.bodyParser = new ChunkedCodingParser; + assert(!bodyParser_); + bodyParser_ = new ChunkedCodingParser; } /// put parsed content into input buffer and clean up @@ -4411,25 +4177,8 @@ } } - delete in.bodyParser; - in.bodyParser = NULL; -} - -char * -ConnStateData::In::addressToReadInto() const -{ - return buf + notYetUsed; -} - -ConnStateData::In::In() : bodyParser(NULL), - buf (NULL), notYetUsed (0), allocatedSize (0) -{} - -ConnStateData::In::~In() -{ - if (allocatedSize) - memFreeBuf(allocatedSize, buf); - delete bodyParser; // TODO: pool + delete bodyParser_; + bodyParser_ = NULL; } void @@ -4447,7 +4196,7 @@ } debugs(33, 3, HERE << " closing due to missing context for 1xx"); - clientConnection->close(); + tcp->close(); } /// Our close handler called by Comm when the pinned connection is closed @@ -4460,9 +4209,9 @@ pinning.closeHandler = NULL; // Comm unregisters handlers before calling const bool sawZeroReply = pinning.zeroReply; // reset when unpinning unpinConnection(); - if (sawZeroReply && clientConnection != NULL) { + if (sawZeroReply && tcp != NULL) { debugs(33, 3, "Closing client connection on pinned zero reply."); - clientConnection->close(); + tcp->close(); } } @@ -4500,8 +4249,8 @@ char stmp[MAX_IPSTRLEN]; snprintf(desc, FD_DESC_SZ, "%s pinned connection for %s (%d)", (auth || !aPeer) ? pinnedHost : aPeer->name, - clientConnection->remote.toUrl(stmp,MAX_IPSTRLEN), - clientConnection->fd); + tcp->remote.toUrl(stmp,MAX_IPSTRLEN), + tcp->fd); fd_note(pinning.serverConnection->fd, desc); typedef CommCbMemFunT Dialer; @@ -4561,8 +4310,8 @@ // If we are still sending data to the client, do not close now. When we are done sending, // ClientSocketContext::keepaliveNextRequest() checks pinning.serverConnection and will close. // However, if we are idle, then we must close to inform the idle client and minimize races. - if (clientIsIdle && clientConnection != NULL) - clientConnection->close(); + if (clientIsIdle && tcp != NULL) + tcp->close(); } const Comm::ConnectionPointer === modified file 'src/client_side.h' --- src/client_side.h 2014-01-05 19:49:23 +0000 +++ src/client_side.h 2014-02-28 22:27:31 +0000 @@ -33,6 +33,7 @@ #ifndef SQUID_CLIENTSIDE_H #define SQUID_CLIENTSIDE_H +#include "anyp/Agent.h" #include "comm.h" #include "HttpControlMsg.h" #include "HttpParser.h" @@ -181,22 +182,18 @@ * * If the above can be confirmed accurate we can call this object PipelineManager or similar */ -class ConnStateData : public BodyProducer, public HttpControlMsgSink +class ConnStateData : public AnyP::Agent, public BodyProducer, public HttpControlMsgSink { public: explicit ConnStateData(const MasterXaction::Pointer &xact); ~ConnStateData(); - void readSomeData(); - int getAvailableBufferLength() const; bool areAllContextsForThisConnection() const; void freeAllContexts(); - void notifyAllContexts(const int xerrno); ///< tell everybody about the err /// Traffic parsing bool clientParseRequests(); void readNextRequest(); - bool maybeMakeSpaceAvailable(); ClientSocketContext::Pointer getCurrentContext() const; void addContextToQueue(ClientSocketContext * context); int getConcurrentRequestCount() const; @@ -206,27 +203,14 @@ // HttpControlMsgSink API virtual void sendControlMsg(HttpControlMsg msg); - // Client TCP connection details from comm layer. - Comm::ConnectionPointer clientConnection; - - struct In { - In(); - ~In(); - char *addressToReadInto() const; - - ChunkedCodingParser *bodyParser; ///< parses chunked request body - char *buf; - size_t notYetUsed; - size_t allocatedSize; - } in; - - /** number of body bytes we need to comm_read for the "current" request + /** Number of body bytes we need to comm_read for the "current" request. + * Request messages can be aborted. Only incomplete body matter here. * * \retval 0 We do not need to read any [more] body bytes * \retval negative May need more but do not know how many; could be zero! * \retval positive Need to read exactly that many more body bytes */ - int64_t mayNeedToReadMoreBody() const; + virtual int64_t mayNeedToReadMore() const; #if USE_AUTH /** @@ -275,17 +259,6 @@ AnyP::PortCfg *port; bool transparent() const; - bool reading() const; - void stopReading(); ///< cancels comm_read if it is scheduled - - /// true if we stopped receiving the request - const char *stoppedReceiving() const { return stoppedReceiving_; } - /// true if we stopped sending the response - const char *stoppedSending() const { return stoppedSending_; } - /// note request receiving error and close as soon as we write the response - void stopReceiving(const char *error); - /// note response sending error and close as soon as we read the request - void stopSending(const char *error); void expectNoForwarding(); ///< cleans up virgin request [body] forwarding state @@ -293,8 +266,7 @@ virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); - bool handleReadData(char *buf, size_t size); - bool handleRequestBodyData(); + bool processRequestBodyData(MemBuf &aBuf); /** * Correlate the current ConnStateData object with the pinning_fd socket descriptor. @@ -322,12 +294,10 @@ void clientPinnedConnectionClosed(const CommCloseCbParams &io); // comm callbacks - void clientReadRequest(const CommIoCbParams &io); - void connStateClosed(const CommCloseCbParams &io); void requestTimeout(const CommTimeoutCbParams ¶ms); // AsyncJob API - virtual bool doneAll() const { return BodyProducer::doneAll() && false;} + virtual bool doneAll() const {return BodyProducer::doneAll() && AnyP::Agent::doneAll() && !inBuf.hasContent();} virtual void swanSong(); /// Changes state so that we close the connection and quit after serving @@ -387,10 +357,18 @@ void startPinnedConnectionMonitoring(); void clientPinnedConnectionRead(const CommIoCbParams &io); +public: + // AnyP::Agent API + // XXX: cannot be private due to httpsSslBumpAccessCheckDone() fake request hack. + virtual bool processReadBuffer(MemBuf &aBuf); + // Inject some bytes to prefix the existing buffer contents (if any). + bool injectPrefixBytesXXX(const MemBuf &pfx); private: - int connReadWasError(comm_err_t flag, int size, int xerrno); - int connFinishedWithConn(int size); - void clientAfterReadingRequests(); + virtual void updateByteCountersOnRead(size_t sz); + virtual void noteTransportReadError(int xerrno); + virtual const char * maybeFinishedWithTransport(MemBuf &aBuf); + + bool processRequestMessageData(MemBuf &aBuf); bool concurrentRequestQueueFilled() const; #if USE_AUTH @@ -414,14 +392,10 @@ Ssl::CertSignAlgorithm signAlgorithm; ///< The signing algorithm to use #endif - /// the reason why we no longer write the response or nil - const char *stoppedSending_; - /// the reason why we no longer read the request or nil - const char *stoppedReceiving_; - - AsyncCall::Pointer reader; ///< set when we are reading BodyPipe::Pointer bodyPipe; // set when we are reading request body + ChunkedCodingParser *bodyParser_; ///< parses chunked request body + CBDATA_CLASS2(ConnStateData); }; === modified file 'src/client_side_reply.cc' --- src/client_side_reply.cc 2014-01-01 19:20:49 +0000 +++ src/client_side_reply.cc 2014-01-19 05:41:22 +0000 @@ -290,7 +290,7 @@ * A refcounted pointer so that FwdState stays around as long as * this clientReplyContext does */ - Comm::ConnectionPointer conn = http->getConn() != NULL ? http->getConn()->clientConnection : NULL; + Comm::ConnectionPointer conn = http->getConn() != NULL ? http->getConn()->tcp : NULL; FwdState::Start(conn, http->storeEntry(), http->request, http->al); /* Register with storage manager to receive updates when data comes in. */ @@ -655,7 +655,7 @@ /// Deny loops if (r->flags.loopDetected) { http->al->http.code = Http::scForbidden; - err = clientBuildError(ERR_ACCESS_DENIED, Http::scForbidden, NULL, http->getConn()->clientConnection->remote, http->request); + err = clientBuildError(ERR_ACCESS_DENIED, Http::scForbidden, NULL, http->getConn()->tcp->remote, http->request); createStoreEntry(r->method, RequestFlags()); errorAppendEntry(http->storeEntry(), err); triggerInitialStoreRead(); @@ -682,7 +682,7 @@ assert(r->clientConnectionManager == http->getConn()); /** Start forwarding to get the new object from network */ - Comm::ConnectionPointer conn = http->getConn() != NULL ? http->getConn()->clientConnection : NULL; + Comm::ConnectionPointer conn = http->getConn() != NULL ? http->getConn()->tcp : NULL; FwdState::Start(conn, http->storeEntry(), r, http->al); } } @@ -700,7 +700,7 @@ RequestMethodStr(http->request->method) << " " << http->uri << "'"); http->al->http.code = Http::scGateway_Timeout; ErrorState *err = clientBuildError(ERR_ONLY_IF_CACHED_MISS, Http::scGateway_Timeout, NULL, - http->getConn()->clientConnection->remote, http->request); + http->getConn()->tcp->remote, http->request); removeClientStoreReference(&sc, http); startError(err); } @@ -891,7 +891,7 @@ if (EBIT_TEST(entry->flags, ENTRY_SPECIAL)) { http->logType = LOG_TCP_DENIED; ErrorState *err = clientBuildError(ERR_ACCESS_DENIED, Http::scForbidden, NULL, - http->getConn()->clientConnection->remote, http->request); + http->getConn()->tcp->remote, http->request); startError(err); return; // XXX: leaking unused entry if some store does not keep it } @@ -928,7 +928,7 @@ if (!Config2.onoff.enable_purge) { http->logType = LOG_TCP_DENIED; - ErrorState *err = clientBuildError(ERR_ACCESS_DENIED, Http::scForbidden, NULL, http->getConn()->clientConnection->remote, http->request); + ErrorState *err = clientBuildError(ERR_ACCESS_DENIED, Http::scForbidden, NULL, http->getConn()->tcp->remote, http->request); startError(err); return; } @@ -1773,11 +1773,11 @@ assert(http->out.offset == 0); if (Ip::Qos::TheConfig.isHitTosActive()) { - Ip::Qos::doTosLocalHit(http->getConn()->clientConnection); + Ip::Qos::doTosLocalHit(http->getConn()->tcp); } if (Ip::Qos::TheConfig.isHitNfmarkActive()) { - Ip::Qos::doNfmarkLocalHit(http->getConn()->clientConnection); + Ip::Qos::doNfmarkLocalHit(http->getConn()->tcp); } localTempBuffer.offset = reqofs; @@ -1878,7 +1878,7 @@ tmp_noaddr.setNoAddr(); // TODO: make a global const http->logType = LOG_TCP_DENIED_REPLY; ErrorState *err = clientBuildError(ERR_TOO_BIG, Http::scForbidden, NULL, - http->getConn() != NULL ? http->getConn()->clientConnection->remote : tmp_noaddr, + http->getConn() != NULL ? http->getConn()->tcp->remote : tmp_noaddr, http->request); removeClientStoreReference(&(sc), http); HTTPMSGUNLOCK(reply); @@ -1893,7 +1893,7 @@ http->logType = LOG_TCP_HIT; ErrorState *const err = clientBuildError(ERR_PRECONDITION_FAILED, Http::scPreconditionFailed, - NULL, http->getConn()->clientConnection->remote, http->request); + NULL, http->getConn()->tcp->remote, http->request); removeClientStoreReference(&sc, http); HTTPMSGUNLOCK(reply); startError(err); @@ -2000,7 +2000,7 @@ Ip::Address tmp_noaddr; tmp_noaddr.setNoAddr(); err = clientBuildError(page_id, Http::scForbidden, NULL, - http->getConn() != NULL ? http->getConn()->clientConnection->remote : tmp_noaddr, + http->getConn() != NULL ? http->getConn()->tcp->remote : tmp_noaddr, http->request); removeClientStoreReference(&sc, http); @@ -2096,11 +2096,11 @@ return; } if (!conn->isOpen()) { - debugs(33,3, "not sending more data to closing connection " << conn->clientConnection); + debugs(33,3, "not sending more data to closing connection " << conn->tcp); return; } if (conn->pinning.zeroReply) { - debugs(33,3, "not sending more data after a pinned zero reply " << conn->clientConnection); + debugs(33,3, "not sending more data after a pinned zero reply " << conn->tcp); return; } @@ -2112,12 +2112,12 @@ memcpy(buf, result.data, result.length); } - if (reqofs==0 && !logTypeIsATcpHit(http->logType) && Comm::IsConnOpen(conn->clientConnection)) { + if (reqofs==0 && !logTypeIsATcpHit(http->logType) && Comm::IsConnOpen(conn->tcp)) { if (Ip::Qos::TheConfig.isHitTosActive()) { - Ip::Qos::doTosLocalMiss(conn->clientConnection, http->request->hier.code); + Ip::Qos::doTosLocalMiss(conn->tcp, http->request->hier.code); } if (Ip::Qos::TheConfig.isHitNfmarkActive()) { - Ip::Qos::doNfmarkLocalMiss(conn->clientConnection, http->request->hier.code); + Ip::Qos::doNfmarkLocalMiss(conn->tcp, http->request->hier.code); } } @@ -2140,7 +2140,7 @@ reqofs << " bytes (" << result.length << " new bytes)"); debugs(88, 5, "clientReplyContext::sendMoreData:" - << conn->clientConnection << + << conn->tcp << " '" << entry->url() << "'" << " out.offset=" << http->out.offset); === modified file 'src/client_side_request.cc' --- src/client_side_request.cc 2014-02-13 06:09:26 +0000 +++ src/client_side_request.cc 2014-02-28 20:12:50 +0000 @@ -162,14 +162,15 @@ { setConn(aConn); al = new AccessLogEntry; + al->tcpClient = clientConnection = aConn->tcp; + al->cache.start_time = current_time; - al->tcpClient = clientConnection = aConn->clientConnection; al->cache.port = cbdataReference(aConn->port); al->cache.caddr = aConn->log_addr; #if USE_SSL - if (aConn->clientConnection != NULL && aConn->clientConnection->isOpen()) { - if (SSL *ssl = fd_table[aConn->clientConnection->fd].ssl) + if (aConn->tcp != NULL && aConn->tcp->isOpen()) { + if (SSL *ssl = fd_table[aConn->tcp->fd].ssl) al->cache.sslClientCert.reset(SSL_get_peer_certificate(ssl)); } #endif @@ -534,7 +535,7 @@ void ClientRequestContext::hostHeaderIpVerify(const ipcache_addrs* ia, const DnsLookupDetails &dns) { - Comm::ConnectionPointer clientConn = http->getConn()->clientConnection; + Comm::ConnectionPointer clientConn = http->getConn()->tcp; // note the DNS details for the transaction stats. http->request->recordLookup(dns); @@ -561,7 +562,7 @@ // IP address validation for Host: failed. Admin wants to ignore them. // NP: we do not yet handle CONNECT tunnels well, so ignore for them if (!Config.onoff.hostStrictVerify && http->request->method != Http::METHOD_CONNECT) { - debugs(85, 3, "SECURITY ALERT: Host header forgery detected on " << http->getConn()->clientConnection << + debugs(85, 3, "SECURITY ALERT: Host header forgery detected on " << http->getConn()->tcp << " (" << A << " does not match " << B << ") on URL: " << urlCanonical(http->request)); // NP: it is tempting to use 'flags.noCache' but that is all about READing cache data. @@ -575,7 +576,7 @@ } debugs(85, DBG_IMPORTANT, "SECURITY ALERT: Host header forgery detected on " << - http->getConn()->clientConnection << " (" << A << " does not match " << B << ")"); + http->getConn()->tcp << " (" << A << " does not match " << B << ")"); debugs(85, DBG_IMPORTANT, "SECURITY ALERT: By user agent: " << http->request->header.getStr(HDR_USER_AGENT)); debugs(85, DBG_IMPORTANT, "SECURITY ALERT: on URL: " << urlCanonical(http->request)); @@ -585,7 +586,7 @@ assert (repContext); repContext->setReplyToError(ERR_CONFLICT_HOST, Http::scConflict, http->request->method, NULL, - http->getConn()->clientConnection->remote, + http->getConn()->tcp->remote, http->request, NULL, #if USE_AUTH @@ -653,8 +654,8 @@ debugs(85, 3, HERE << "validate host=" << host << ", port=" << port << ", portStr=" << (portStr?portStr:"NULL")); if (http->request->flags.intercepted || http->request->flags.interceptTproxy) { // verify the Host: port (if any) matches the apparent destination - if (portStr && port != http->getConn()->clientConnection->local.port()) { - debugs(85, 3, HERE << "FAIL on validate port " << http->getConn()->clientConnection->local.port() << + if (portStr && port != http->getConn()->tcp->local.port()) { + debugs(85, 3, "FAIL on validate port " << http->getConn()->tcp->local.port() << " matches Host: port " << port << " (" << portStr << ")"); hostHeaderVerifyFailed("intercepted port", portStr); } else { @@ -823,7 +824,7 @@ tmpnoaddr.setNoAddr(); error = clientBuildError(page_id, status, NULL, - http->getConn() != NULL ? http->getConn()->clientConnection->remote : tmpnoaddr, + http->getConn() != NULL ? http->getConn()->tcp->remote : tmpnoaddr, http->request ); @@ -853,11 +854,11 @@ #if ICAP_CLIENT Adaptation::Icap::History::Pointer ih = request->icapHistory(); if (ih != NULL) { - if (getConn() != NULL && getConn()->clientConnection != NULL) { - ih->rfc931 = getConn()->clientConnection->rfc931; + if (getConn() != NULL && getConn()->tcp != NULL) { + ih->rfc931 = getConn()->tcp->rfc931; #if USE_SSL - if (getConn()->clientConnection->isOpen()) { - ih->ssluser = sslGetUserEmail(fd_table[getConn()->clientConnection->fd].ssl); + if (getConn()->tcp->isOpen()) { + ih->ssluser = sslGetUserEmail(fd_table[getConn()->tcp->fd].ssl); } #endif } @@ -1341,8 +1342,8 @@ /* FIXME PIPELINE: This is innacurate during pipelining */ - if (http->getConn() != NULL && Comm::IsConnOpen(http->getConn()->clientConnection)) - fd_note(http->getConn()->clientConnection->fd, http->uri); + if (http->getConn() != NULL && Comm::IsConnOpen(http->getConn()->tcp)) + fd_note(http->getConn()->tcp->fd, http->uri); assert(http->uri); @@ -1527,7 +1528,10 @@ } #endif logType = LOG_TCP_MISS; - getConn()->stopReading(); // tunnels read for themselves + /* NP: stopReadingXXX() is a hack needed to allow TunnelStateData + * to take control of a socket despite any scheduled read(2) from ConnStateData. + */ + getConn()->stopReadingXXX(); // tunnels read(2) for themselves tunnelStart(this, &out.size, &al->http.code, al); return; } @@ -1579,7 +1583,7 @@ if (errflag) { debugs(85, 3, HERE << "CONNECT response failure in SslBump: " << errflag); - getConn()->clientConnection->close(); + getConn()->tcp->close(); return; } @@ -1601,15 +1605,20 @@ ClientHttpRequest::sslBumpStart() { debugs(85, 5, HERE << "Confirming " << Ssl::bumpMode(sslBumpNeed_) << - "-bumped CONNECT tunnel on FD " << getConn()->clientConnection); + "-bumped CONNECT tunnel on FD " << getConn()->tcp); getConn()->sslBumpMode = sslBumpNeed_; // send an HTTP 200 response to kick client SSL negotiation // TODO: Unify with tunnel.cc and add a Server(?) header - static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; + static const char *fakeRequest = "HTTP/1.1 200 Connection established\r\n\r\n"; + static MemBuf conn_established; + if (!conn_established.hasContent()) { + conn_established.init(); + conn_established.append(fakeRequest, strlen(fakeRequest)); + } AsyncCall::Pointer call = commCbCall(85, 5, "ClientSocketContext::sslBumpEstablish", CommIoCbPtrFun(&SslBumpEstablish, this)); - Comm::Write(getConn()->clientConnection, conn_established, strlen(conn_established), call, NULL); + getConn()->sendSomeData(conn_established, call); } #endif @@ -1768,25 +1777,25 @@ if (!calloutContext->tosToClientDone) { calloutContext->tosToClientDone = true; - if (getConn() != NULL && Comm::IsConnOpen(getConn()->clientConnection)) { + if (getConn() != NULL && Comm::IsConnOpen(getConn()->tcp)) { ACLFilledChecklist ch(NULL, request, NULL); ch.src_addr = request->client_addr; ch.my_addr = request->my_addr; tos_t tos = aclMapTOS(Ip::Qos::TheConfig.tosToClient, &ch); if (tos) - Ip::Qos::setSockTos(getConn()->clientConnection, tos); + Ip::Qos::setSockTos(getConn()->tcp, tos); } } if (!calloutContext->nfmarkToClientDone) { calloutContext->nfmarkToClientDone = true; - if (getConn() != NULL && Comm::IsConnOpen(getConn()->clientConnection)) { + if (getConn() != NULL && Comm::IsConnOpen(getConn()->tcp)) { ACLFilledChecklist ch(NULL, request, NULL); ch.src_addr = request->client_addr; ch.my_addr = request->my_addr; nfmark_t mark = aclMapNfmark(Ip::Qos::TheConfig.nfmarkToClient, &ch); if (mark) - Ip::Qos::setSockNfmark(getConn()->clientConnection, mark); + Ip::Qos::setSockNfmark(getConn()->tcp, mark); } } @@ -2033,7 +2042,7 @@ debugs(85,3, HERE << "REQMOD body production failed"); if (request_satisfaction_mode) { // too late to recover or serve an error request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_CLT_REQMOD_RESP_BODY); - const Comm::ConnectionPointer c = getConn()->clientConnection; + const Comm::ConnectionPointer c = getConn()->tcp; Must(Comm::IsConnOpen(c)); c->close(); // drastic, but we may be writing a response already } else { @@ -2072,7 +2081,7 @@ ConnStateData * c = getConn(); calloutContext->error = clientBuildError(ERR_ICAP_FAILURE, Http::scInternalServerError, NULL, - c != NULL ? c->clientConnection->remote : noAddr, + c != NULL ? c->tcp->remote : noAddr, request ); #if USE_AUTH === modified file 'src/esi/Esi.cc' --- src/esi/Esi.cc 2013-10-25 00:13:46 +0000 +++ src/esi/Esi.cc 2013-10-29 02:33:43 +0000 @@ -1452,8 +1452,8 @@ /* don't honour range requests - for errors we send it all */ flags.error = 1; /* create an error object */ - // XXX: with the in-direction on remote IP. does the http->getConn()->clientConnection exist? - ErrorState * err = clientBuildError(errorpage, errorstatus, NULL, http->getConn()->clientConnection->remote, http->request); + // XXX: with the in-direction on remote IP. does the http->getConn()->tcp exist? + ErrorState * err = clientBuildError(errorpage, errorstatus, NULL, http->getConn()->tcp->remote, http->request); err->err_msg = errormessage; errormessage = NULL; rep = err->BuildHttpReply(); === modified file 'src/external_acl.cc' --- src/external_acl.cc 2014-02-08 13:36:42 +0000 +++ src/external_acl.cc 2014-02-17 11:20:40 +0000 @@ -1018,14 +1018,14 @@ #if USE_SQUID_EUI case _external_acl_format::EXT_ACL_SRCEUI48: - if (request->clientConnectionManager.valid() && request->clientConnectionManager->clientConnection != NULL && - request->clientConnectionManager->clientConnection->remoteEui48.encode(buf, sizeof(buf))) + if (request->clientConnectionManager.valid() && request->clientConnectionManager->tcp != NULL && + request->clientConnectionManager->tcp->remoteEui48.encode(buf, sizeof(buf))) str = buf; break; case _external_acl_format::EXT_ACL_SRCEUI64: - if (request->clientConnectionManager.valid() && request->clientConnectionManager->clientConnection != NULL && - request->clientConnectionManager->clientConnection->remoteEui64.encode(buf, sizeof(buf))) + if (request->clientConnectionManager.valid() && request->clientConnectionManager->tcp != NULL && + request->clientConnectionManager->tcp->remoteEui64.encode(buf, sizeof(buf))) str = buf; break; #endif @@ -1115,8 +1115,8 @@ case _external_acl_format::EXT_ACL_USER_CERT_RAW: - if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->clientConnection)) { - SSL *ssl = fd_table[ch->conn()->clientConnection->fd].ssl; + if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->tcp)) { + SSL *ssl = fd_table[ch->conn()->tcp->fd].ssl; if (ssl) str = sslGetUserCertificatePEM(ssl); @@ -1126,8 +1126,8 @@ case _external_acl_format::EXT_ACL_USER_CERTCHAIN_RAW: - if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->clientConnection)) { - SSL *ssl = fd_table[ch->conn()->clientConnection->fd].ssl; + if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->tcp)) { + SSL *ssl = fd_table[ch->conn()->tcp->fd].ssl; if (ssl) str = sslGetUserCertificateChainPEM(ssl); @@ -1137,8 +1137,8 @@ case _external_acl_format::EXT_ACL_USER_CERT: - if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->clientConnection)) { - SSL *ssl = fd_table[ch->conn()->clientConnection->fd].ssl; + if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->tcp)) { + SSL *ssl = fd_table[ch->conn()->tcp->fd].ssl; if (ssl) str = sslGetUserAttribute(ssl, format->header); @@ -1148,8 +1148,8 @@ case _external_acl_format::EXT_ACL_USER_CA_CERT: - if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->clientConnection)) { - SSL *ssl = fd_table[ch->conn()->clientConnection->fd].ssl; + if (ch->conn() != NULL && Comm::IsConnOpen(ch->conn()->tcp)) { + SSL *ssl = fd_table[ch->conn()->tcp->fd].ssl; if (ssl) str = sslGetCAAttribute(ssl, format->header); === modified file 'src/format/Format.cc' --- src/format/Format.cc 2014-02-08 13:36:42 +0000 +++ src/format/Format.cc 2014-02-17 11:20:40 +0000 @@ -351,11 +351,11 @@ case LFT_CLIENT_EUI: #if USE_SQUID_EUI // TODO make the ACL checklist have a direct link to any TCP details. - if (al->request && al->request->clientConnectionManager.valid() && al->request->clientConnectionManager->clientConnection != NULL) { - if (al->request->clientConnectionManager->clientConnection->remote.isIPv4()) - al->request->clientConnectionManager->clientConnection->remoteEui48.encode(tmp, 1024); + if (al->request && al->request->clientConnectionManager.valid() && al->request->clientConnectionManager->tcp != NULL) { + if (al->request->clientConnectionManager->tcp->remote.isIPv4()) + al->request->clientConnectionManager->tcp->remoteEui48.encode(tmp, 1024); else - al->request->clientConnectionManager->clientConnection->remoteEui64.encode(tmp, 1024); + al->request->clientConnectionManager->tcp->remoteEui64.encode(tmp, 1024); out = tmp; } #else === modified file 'src/http.cc' --- src/http.cc 2013-12-06 23:52:26 +0000 +++ src/http.cc 2014-02-28 17:18:51 +0000 @@ -103,16 +103,20 @@ //Declared in HttpHeaderTools.cc void httpHdrAdd(HttpHeader *heads, HttpRequest *request, const AccessLogEntryPointer &al, HeaderWithAclList &headers_add); -HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"), ServerStateData(theFwdState), +HttpStateData::HttpStateData(FwdState *theFwdState) : + AsyncJob("HttpStateData"), + AnyP::Agent(), + ServerStateData(theFwdState), lastChunk(0), header_bytes_read(0), reply_bytes_read(0), body_bytes_truncated(0), httpChunkDecoder(NULL) { - debugs(11,5,HERE << "HttpStateData " << this << " created"); + debugs(11, 5, "HttpStateData " << this << " created"); ignoreCacheControl = false; surrogateNoStore = false; - serverConnection = fwd->serverConnection(); - readBuf = new MemBuf; - readBuf->init(16*1024, 256*1024); + // XXX: there is no config option to set the HTTP server-side buffer size + // So for now use 16KB but allow growth up to 2x the larger of reply_header_max_size and read_ahead_gap + // which defaults to 16-128 KB + inBuf.init(16*1024, 2*max(static_cast(Config.maxReplyHeaderSize), Config.readAheadGap)); // reset peer response time stats for %hier.peer_http_request_sent.tv_sec = 0; @@ -136,12 +140,7 @@ #endif } - /* - * register the handler to free HTTP state data when the FD closes - */ - typedef CommCbMemFunT Dialer; - closeHandler = JobCallback(9, 5, Dialer, this, HttpStateData::httpStateConnClosed); - comm_add_close_handler(serverConnection->fd, closeHandler); + connectionInit(theFwdState->serverConnection()); } HttpStateData::~HttpStateData() @@ -150,42 +149,37 @@ * don't forget that ~ServerStateData() gets called automatically */ - if (!readBuf->isNull()) - readBuf->clean(); - - delete readBuf; - if (httpChunkDecoder) delete httpChunkDecoder; cbdataReferenceDone(_peer); - debugs(11,5, HERE << "HttpStateData " << this << " destroyed; " << serverConnection); + debugs(11, 5, "HttpStateData " << this << " destroyed; " << tcp); +} + +void +HttpStateData::swanSong() +{ + AnyP::Agent::swanSong(); } const Comm::ConnectionPointer & HttpStateData::dataConnection() const { - return serverConnection; -} - -void -HttpStateData::httpStateConnClosed(const CommCloseCbParams ¶ms) -{ - debugs(11, 5, "httpStateFree: FD " << params.fd << ", httpState=" << params.data); - mustStop("HttpStateData::httpStateConnClosed"); + return tcp; } void HttpStateData::httpTimeout(const CommTimeoutCbParams ¶ms) { - debugs(11, 4, HERE << serverConnection << ": '" << entry->url() << "'" ); + debugs(11, 4, tcp << ": '" << entry->url() << "'"); if (entry->store_status == STORE_PENDING) { fwd->fail(new ErrorState(ERR_READ_TIMEOUT, Http::scGateway_Timeout, fwd->request)); } - serverConnection->close(); + stopSending("HttpStateData timeout"); + stopReceiving("HttpStateData timeout"); } /// Remove an existing public store entry if the incoming response (to be @@ -680,7 +674,7 @@ if (Config.onoff.detect_broken_server_pconns && reply->bodySize(request->method) == -1 && !flags.chunked) { debugs(11, DBG_IMPORTANT, "keepaliveAccounting: Impossible keep-alive header from '" << entry->url() << "'" ); - // debugs(11, 2, "GOT HTTP REPLY HDR:\n---------\n" << readBuf->content() << "\n----------" ); + // debugs(11, 2, "GOT HTTP REPLY HDR:\n---------\n" << inBuf.content() << "\n----------" ); flags.keepalive_broken = true; } } @@ -715,7 +709,7 @@ assert(!flags.headers_parsed); - if (!readBuf->hasContent()) { + if (!inBuf.hasContent()) { ctx_exit(ctx); return; } @@ -723,9 +717,9 @@ Http::StatusCode error = Http::scNone; HttpReply *newrep = new HttpReply; - const bool parsed = newrep->parse(readBuf, eof, &error); + const bool parsed = newrep->parse(&inBuf, eof, &error); - if (!parsed && readBuf->contentSize() > 5 && strncmp(readBuf->content(), "HTTP/", 5) != 0 && strncmp(readBuf->content(), "ICY", 3) != 0) { + if (!parsed && inBuf.contentSize() > 5 && strncmp(inBuf.content(), "HTTP/", 5) != 0 && strncmp(inBuf.content(), "ICY", 3) != 0) { MemBuf *mb; HttpReply *tmprep = new HttpReply; tmprep->setHeaders(Http::scOkay, "Gatewaying", NULL, -1, -1, -1); @@ -736,7 +730,7 @@ delete tmprep; } else { if (!parsed && error > 0) { // unrecoverable parsing error - debugs(11, 3, "processReplyHeader: Non-HTTP-compliant header: '" << readBuf->content() << "'"); + debugs(11, 3, "processReplyHeader: Non-HTTP-compliant header: '" << inBuf.content() << "'"); flags.headers_parsed = true; // XXX: when sanityCheck is gone and Http::StatusLine is used to parse, // the sline should be already set the appropriate values during that parser stage @@ -755,11 +749,11 @@ return; } - debugs(11, 2, "HTTP Server " << serverConnection); - debugs(11, 2, "HTTP Server REPLY:\n---------\n" << readBuf->content() << "\n----------"); + debugs(11, 2, "HTTP Server " << tcp); + debugs(11, 2, "HTTP Server REPLY:\n---------\n" << inBuf.content() << "\n----------"); - header_bytes_read = headersEnd(readBuf->content(), readBuf->contentSize()); - readBuf->consume(header_bytes_read); + header_bytes_read = headersEnd(inBuf.content(), inBuf.contentSize()); + inBuf.consume(header_bytes_read); } newrep->removeStaleWarnings(); @@ -848,7 +842,12 @@ header_bytes_read = 0; reply_bytes_read = 0; - CallJobHere(11, 3, this, HttpStateData, HttpStateData::processReply); + if (inBuf.hasContent()) { + if(!processReadBuffer(inBuf)) + return; + } + + readSomeData(); } /** @@ -1071,7 +1070,7 @@ HttpStateData::ConnectionStatus HttpStateData::persistentConnStatus() const { - debugs(11, 3, HERE << serverConnection << " eof=" << eof); + debugs(11, 3, tcp << " eof=" << eof); if (eof) // already reached EOF return COMPLETE_NONPERSISTENT_MSG; @@ -1079,7 +1078,7 @@ I/O to avoid assertions. TODO: Change Comm API to handle callers that want more I/O after async closing (usually initiated by others). */ // XXX: add canReceive or s/canSend/canTalkToServer/ - if (!Comm::IsConnOpen(serverConnection)) + if (!Comm::IsConnOpen(tcp)) return COMPLETE_NONPERSISTENT_MSG; /** \par @@ -1122,89 +1121,52 @@ return statusIfComplete(); } -/* - * This is the callback after some data has been read from the network - */ -/* +/// handle I/O errors when reading void -HttpStateData::ReadReplyWrapper(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) +HttpStateData::noteTransportReadError(int xerrno) { - HttpStateData *httpState = static_cast(data); - assert (fd == httpState->serverConnection->fd); - // assert(buf == readBuf->content()); - PROF_start(HttpStateData_readReply); - httpState->readReply(len, flag, xerrno); - PROF_stop(HttpStateData_readReply); + ErrorState *err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request); + err->xerrno = xerrno; + fwd->fail(err); + flags.do_next_read = false; // XXX: should not be needed now. TCP conection is closed. } -*/ -/* XXX this function is too long! */ +// update I/O stats void -HttpStateData::readReply(const CommIoCbParams &io) +HttpStateData::updateByteCountersOnRead(size_t sz) { - int bin; - int clen; - int len = io.size; - - flags.do_next_read = false; - - debugs(11, 5, HERE << io.conn << ": len " << len << "."); - - // Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us - if (io.flag == COMM_ERR_CLOSING) { - debugs(11, 3, "http socket closing"); - return; - } - - if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { - abortTransaction("store entry aborted while reading reply"); - return; - } - - // handle I/O errors - if (io.flag != COMM_OK || len < 0) { - debugs(11, 2, HERE << io.conn << ": read failure: " << xstrerror() << "."); - - if (ignoreErrno(io.xerrno)) { - flags.do_next_read = true; - } else { - ErrorState *err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request); - err->xerrno = io.xerrno; - fwd->fail(err); - flags.do_next_read = false; - serverConnection->close(); - } - - return; - } - - // update I/O stats - if (len > 0) { - readBuf->appended(len); - reply_bytes_read += len; + reply_bytes_read += sz; + #if USE_DELAY_POOLS DelayId delayId = entry->mem_obj->mostBytesAllowed(); - delayId.bytesIn(len); + delayId.bytesIn(sz); #endif - kb_incr(&(statCounter.server.all.kbytes_in), len); - kb_incr(&(statCounter.server.http.kbytes_in), len); + kb_incr(&(statCounter.server.all.kbytes_in), sz); + kb_incr(&(statCounter.server.http.kbytes_in), sz); ++ IOStats.Http.reads; - for (clen = len - 1, bin = 0; clen; ++bin) + int bin = 0; + for (size_t clen = sz - 1; clen; ++bin) clen >>= 1; - ++ IOStats.Http.read_hist[bin]; + // XXX: this seems sort of wrong. maybe should be done + // once in the response parse code instead of every read. + // update peer response time stats (%hier.peer_http_request_sent; request->hier.peer_response_time = sent.tv_sec ? tvSubMsec(sent, current_time) : -1; - } +} - /** \par +// checks before stop sending after 0-sized read +const char * +HttpStateData::maybeFinishedWithTransport(MemBuf &aBuf) +{ + /* * Here the RFC says we should ignore whitespace between replies, but we can't as - * doing so breaks HTTP/0.9 replies beginning with witespace, and in addition + * doing so breaks HTTP/0.9 replies beginning with whitespace, and in addition * the response splitting countermeasures is extremely likely to trigger on this, * not allowing connection reuse in the first place. * @@ -1212,34 +1174,52 @@ * tolerance there is all about whitespace between requests and header tokens. */ - if (len == 0) { // reached EOF? - eof = 1; - flags.do_next_read = false; + // reached EOF? + eof = 1; + flags.do_next_read = false; - /* Bug 2879: Replies may terminate with \r\n then EOF instead of \r\n\r\n - * Ensure here that we have at minimum two \r\n when EOF is seen. - * TODO: Add eof parameter to headersEnd() and move this hack there. + /* Bug 2879: Replies may terminate with \r\n then EOF instead of \r\n\r\n + * Ensure here that we have at minimum two \r\n when EOF is seen. + * TODO: Add eof parameter to headersEnd() and move this hack there. + */ + if (aBuf.contentSize() && !flags.headers_parsed) { + /* + * Yes Henrik, there is a point to doing this. When we + * called httpProcessReplyHeader() before, we didn't find + * the end of headers, but now we are definately at EOF, so + * we want to process the reply headers. */ - if (readBuf->contentSize() && !flags.headers_parsed) { - /* - * Yes Henrik, there is a point to doing this. When we - * called httpProcessReplyHeader() before, we didn't find - * the end of headers, but now we are definately at EOF, so - * we want to process the reply headers. - */ - /* Fake an "end-of-headers" to work around such broken servers */ - readBuf->append("\r\n", 2); - } + /* Fake an "end-of-headers" to work around such broken servers */ + aBuf.append("\r\n", 2); + processReadBuffer(aBuf); } - processReply(); + // Now that the above hack has consumed any pending headers that + // can be consumed clear the remaining buffer. doneAll() depends + // on an empty buffer as well as doneWithServer(). + aBuf.clean(); + + // AnyP::Agent guarantees that no more is going to be read. + // whether anything may still be sent depends on us returning NULL below... + // XXX: doneWithServer() is the wrong thing to depend on long-term, so + // how do we identify pending writes? + + return doneWithServer() ? "doneWithServer" : NULL; } /// processes the already read and buffered response data, possibly after /// waiting for asynchronous 1xx control message processing -void -HttpStateData::processReply() +bool +HttpStateData::processReadBuffer(MemBuf &data) { + // XXX: this is equivalent to readMore on client-side + // and needs to be replaced with return results from the parse/process functions + flags.do_next_read = false; + + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { + abortTransaction("store entry aborted while reading reply"); + return false; + } if (flags.handling1xx) { // we came back after handling a 1xx response debugs(11, 5, HERE << "done with 1xx handling"); @@ -1253,7 +1233,7 @@ PROF_stop(HttpStateData_processReplyHeader); if (!continueAfterParsingHeader()) // parsing error or need more data - return; // TODO: send errors to ICAP + return flags.do_next_read; // TODO: send errors to ICAP adaptOrFinalizeReply(); // may write to, abort, or "close" the entry } @@ -1262,6 +1242,8 @@ PROF_start(HttpStateData_processReplyBody); processReplyBody(); // may call serverComplete() PROF_stop(HttpStateData_processReplyBody); + + return flags.do_next_read; } /** @@ -1271,13 +1253,13 @@ HttpStateData::continueAfterParsingHeader() { if (flags.handling1xx) { - debugs(11, 5, HERE << "wait for 1xx handling"); + debugs(11, 5, "wait for 1xx handling"); Must(!flags.headers_parsed); return false; } if (!flags.headers_parsed && !eof) { - debugs(11, 9, HERE << "needs more at " << readBuf->contentSize()); + debugs(11, 9, "needs more at " << inBuf.contentSize()); flags.do_next_read = true; /** \retval false If we have not finished parsing the headers and may get more data. * Schedules more reads to retrieve the missing data. @@ -1298,9 +1280,11 @@ if (s == Http::scInvalidHeader && v != Http::ProtocolVersion(0,9)) { debugs(11, DBG_IMPORTANT, "WARNING: HTTP: Invalid Response: Bad header encountered from " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); error = ERR_INVALID_RESP; + stopReceiving("invalid response"); } else if (s == Http::scHeaderTooLarge) { fwd->dontRetry(true); error = ERR_TOO_BIG; + stopReceiving("response too big"); } else { return true; // done parsing, got reply, and no error } @@ -1308,16 +1292,19 @@ // parsed headers but got no reply debugs(11, DBG_IMPORTANT, "WARNING: HTTP: Invalid Response: No reply at all for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); error = ERR_INVALID_RESP; + stopReceiving("response missing"); } } else { assert(eof); - if (readBuf->hasContent()) { + if (inBuf.hasContent()) { error = ERR_INVALID_RESP; debugs(11, DBG_IMPORTANT, "WARNING: HTTP: Invalid Response: Headers did not parse at all for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); + stopReceiving("response Headers did not parse"); } else { error = ERR_ZERO_SIZE_OBJECT; debugs(11, (request->flags.accelerated?DBG_IMPORTANT:2), "WARNING: HTTP: Invalid Response: No object data received for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); + stopReceiving("response missing object data"); } } @@ -1325,7 +1312,7 @@ entry->reset(); fwd->fail(new ErrorState(error, Http::scBadGateway, fwd->request)); flags.do_next_read = false; - serverConnection->close(); + stopSending("response error"); return false; // quit on error } @@ -1351,8 +1338,11 @@ " clen=" << clen << '/' << vrep->content_length << " body_bytes_truncated=" << body_bytes_truncated << '+' << extras); - readBuf->truncate(extras); + inBuf.truncate(extras); body_bytes_truncated += extras; + stopReceiving("server sent too many bytes for the response"); + // XXX: should we abortTransaction() instead? + // this will prevent future response reads, but allow request to complete. } } @@ -1364,10 +1354,10 @@ HttpStateData::writeReplyBody() { truncateVirginBody(); // if needed - const char *data = readBuf->content(); - int len = readBuf->contentSize(); + const char *data = inBuf.content(); + int len = inBuf.contentSize(); addVirginReplyBody(data, len); - readBuf->consume(len); + inBuf.consume(len); } bool @@ -1381,7 +1371,7 @@ SQUID_ENTER_THROWING_CODE(); MemBuf decodedData; decodedData.init(); - const bool doneParsing = httpChunkDecoder->parse(readBuf,&decodedData); + const bool doneParsing = httpChunkDecoder->parse(&inBuf, &decodedData); len = decodedData.contentSize(); data=decodedData.content(); addVirginReplyBody(data, len); @@ -1421,7 +1411,7 @@ /* * At this point the reply headers have been parsed and consumed. - * That means header content has been removed from readBuf and + * That means header content has been removed from inBuf and * it contains only body data. */ if (entry->isAccepting()) { @@ -1442,13 +1432,13 @@ } else switch (persistentConnStatus()) { case INCOMPLETE_MSG: { - debugs(11, 5, "processReplyBody: INCOMPLETE_MSG from " << serverConnection); + debugs(11, 5, "INCOMPLETE_MSG from " << tcp); /* Wait for more data or EOF condition */ AsyncCall::Pointer nil; if (flags.keepalive_broken) { - commSetConnTimeout(serverConnection, 10, nil); + commSetConnTimeout(tcp, 10, nil); } else { - commSetConnTimeout(serverConnection, Config.Timeout.read, nil); + commSetConnTimeout(tcp, Config.Timeout.read, nil); } flags.do_next_read = true; @@ -1456,14 +1446,12 @@ break; case COMPLETE_PERSISTENT_MSG: - debugs(11, 5, "processReplyBody: COMPLETE_PERSISTENT_MSG from " << serverConnection); + debugs(11, 5, "COMPLETE_PERSISTENT_MSG from " << tcp); /* yes we have to clear all these! */ - commUnsetConnTimeout(serverConnection); + commUnsetConnTimeout(tcp); flags.do_next_read = false; - - comm_remove_close_handler(serverConnection->fd, closeHandler); - closeHandler = NULL; - fwd->unregister(serverConnection); + releaseConnection("done with persistent connection"); + fwd->unregister(tcp); if (request->flags.spoofClientIp) client_addr = request->client_addr; @@ -1475,18 +1463,18 @@ } if (ispinned && request->clientConnectionManager.valid()) { - request->clientConnectionManager->pinConnection(serverConnection, request, _peer, + request->clientConnectionManager->pinConnection(tcp, request, _peer, (request->flags.connectionAuth)); } else { - fwd->pconnPush(serverConnection, request->GetHost()); + fwd->pconnPush(tcp, request->GetHost()); } - serverConnection = NULL; + tcp = NULL; // prevent termination of HttpStateData closing the connection. serverComplete(); return; case COMPLETE_NONPERSISTENT_MSG: - debugs(11, 5, "processReplyBody: COMPLETE_NONPERSISTENT_MSG from " << serverConnection); + debugs(11, 5, "COMPLETE_NONPERSISTENT_MSG from " << tcp); serverComplete(); return; } @@ -1497,41 +1485,34 @@ void HttpStateData::maybeReadVirginBody() { +#if 0 // XXX: no longer needed? + // too late to read - if (!Comm::IsConnOpen(serverConnection) || fd_table[serverConnection->fd].closing()) + if (!Comm::IsConnOpen(tcp) || fd_table[tcp->fd].closing()) return; // we may need to grow the buffer if headers do not fit const int minRead = flags.headers_parsed ? 0 :1024; - const int read_size = replyBodySpace(*readBuf, minRead); - - debugs(11,9, HERE << (flags.do_next_read ? "may" : "wont") << - " read up to " << read_size << " bytes from " << serverConnection); - - /* - * why <2? Because delayAwareRead() won't actually read if - * you ask it to read 1 byte. The delayed read request - * just gets re-queued until the client side drains, then - * the I/O thread hangs. Better to not register any read - * handler until we get a notification from someone that - * its okay to read again. - */ - if (read_size < 2) - return; - - if (flags.do_next_read) { - flags.do_next_read = false; - typedef CommCbMemFunT Dialer; - entry->delayAwareRead(serverConnection, readBuf->space(read_size), read_size, - JobCallback(11, 5, Dialer, this, HttpStateData::readReply)); - } + const int read_size = replyBodySpace(inBuf, minRead); +#endif + + if (flags.do_next_read) + readSomeData(); +} + +bool +HttpStateData::maybeDelayRead(const AsyncCall::Pointer &call) +{ + flags.do_next_read = false; + entry->delayAwareRead(tcp, inBuf.space(), inBuf.spaceSize(), call); + return true; // always true for HTTP server connections. } /// called after writing the very last request byte (body, last-chunk, etc) void HttpStateData::wroteLast(const CommIoCbParams &io) { - debugs(11, 5, HERE << serverConnection << ": size " << io.size << ": errflag " << io.flag << "."); + debugs(11, 5, tcp << ": size " << io.size << ": errflag " << io.flag << "."); #if URL_CHECKSUM_DEBUG entry->mem_obj->checkUrlChecksum(); @@ -1550,7 +1531,9 @@ ErrorState *err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request); err->xerrno = io.xerrno; fwd->fail(err); - serverConnection->close(); + // XXX: use abortTransaction() instead? + stopSending("write error"); + stopReceiving("write error"); return; } @@ -1573,7 +1556,7 @@ AsyncCall::Pointer timeoutCall = JobCallback(11, 5, TimeoutDialer, this, HttpStateData::httpTimeout); - commSetConnTimeout(serverConnection, Config.Timeout.read, timeoutCall); + commSetConnTimeout(tcp, Config.Timeout.read, timeoutCall); flags.request_sent = true; request->hier.peer_http_request_sent = current_time; } @@ -1582,20 +1565,21 @@ void HttpStateData::closeServer() { - debugs(11,5, HERE << "closing HTTP server " << serverConnection << " this " << this); + debugs(11,5, "closing HTTP server " << tcp << " this " << this); - if (Comm::IsConnOpen(serverConnection)) { - fwd->unregister(serverConnection); - comm_remove_close_handler(serverConnection->fd, closeHandler); - closeHandler = NULL; - serverConnection->close(); + if (Comm::IsConnOpen(tcp)) { + fwd->unregister(tcp); + releaseConnection("closeServer"); + stopSending("done"); + stopReceiving("done"); } } bool HttpStateData::doneWithServer() const { - return !Comm::IsConnOpen(serverConnection); + // XXX: use stoppedSending() && stoppedReading() instead? + return !Comm::IsConnOpen(tcp); } /* @@ -2150,10 +2134,10 @@ { MemBuf mb; - debugs(11, 5, HERE << serverConnection << ", request " << request << ", this " << this << "."); + debugs(11, 5, tcp << ", request " << request << ", this " << this << "."); - if (!Comm::IsConnOpen(serverConnection)) { - debugs(11,3, HERE << "cannot send request to closing " << serverConnection); + if (!Comm::IsConnOpen(tcp)) { + debugs(11,3, "cannot send request to closing " << tcp); assert(closeHandler != NULL); return false; } @@ -2161,7 +2145,7 @@ typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(11, 5, TimeoutDialer, this, HttpStateData::httpTimeout); - commSetConnTimeout(serverConnection, Config.Timeout.lifetime, timeoutCall); + commSetConnTimeout(tcp, Config.Timeout.lifetime, timeoutCall); flags.do_next_read = true; maybeReadVirginBody(); @@ -2226,10 +2210,9 @@ request->peer_host=_peer?_peer->host:NULL; buildRequestPrefix(&mb); - debugs(11, 2, "HTTP Server " << serverConnection); + debugs(11, 2, "HTTP Server " << tcp); debugs(11, 2, "HTTP Server REQUEST:\n---------\n" << mb.buf << "\n----------"); - - Comm::Write(serverConnection, &mb, requestSender); + sendSomeData(mb, requestSender); return true; } @@ -2309,8 +2292,8 @@ return false; } - if (!Comm::IsConnOpen(serverConnection)) { - debugs(11, 3, HERE << "ignoring broken POST for closed " << serverConnection); + if (!Comm::IsConnOpen(tcp)) { + debugs(11, 3, "ignoring broken POST for closed " << tcp); assert(closeHandler != NULL); return true; // prevent caller from proceeding as if nothing happened } @@ -2319,7 +2302,7 @@ typedef CommCbMemFunT Dialer; requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast); - Comm::Write(serverConnection, "\r\n", 2, requestSender, NULL); + Comm::Write(tcp, "\r\n", 2, requestSender, NULL); return true; #else return false; @@ -2340,7 +2323,7 @@ typedef CommCbMemFunT Dialer; requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast); - Comm::Write(serverConnection, "0\r\n\r\n", 5, requestSender, NULL); + Comm::Write(tcp, "0\r\n\r\n", 5, requestSender, NULL); return true; } @@ -2348,7 +2331,7 @@ HttpStateData::doneSendingRequestBody() { ServerStateData::doneSendingRequestBody(); - debugs(11,5, HERE << serverConnection); + debugs(11, 5, tcp); // do we need to write something after the last body byte? if (flags.chunked_request && finishingChunkedRequest()) @@ -2363,7 +2346,7 @@ void HttpStateData::handleMoreRequestBodyAvailable() { - if (eof || !Comm::IsConnOpen(serverConnection)) { + if (eof || !Comm::IsConnOpen(tcp)) { // XXX: we should check this condition in other callbacks then! // TODO: Check whether this can actually happen: We should unsubscribe // as a body consumer when the above condition(s) are detected. @@ -2381,7 +2364,8 @@ debugs(11, DBG_IMPORTANT, "http handleMoreRequestBodyAvailable: Likely proxy abuse detected '" << request->client_addr << "' -> '" << entry->url() << "'" ); if (virginReply()->sline.status() == Http::scInvalidHeader) { - serverConnection->close(); + stopSending("proxy abuse detected"); + stopReceiving("proxy abuse detected"); return; } } @@ -2396,7 +2380,7 @@ { ServerStateData::handleRequestBodyProducerAborted(); if (entry->isEmpty()) { - debugs(11, 3, "request body aborted: " << serverConnection); + debugs(11, 3, "request body aborted: " << tcp); // We usually get here when ICAP REQMOD aborts during body processing. // We might also get here if client-side aborts, but then our response // should not matter because either client-side will provide its own or @@ -2425,12 +2409,12 @@ void HttpStateData::abortTransaction(const char *reason) { - debugs(11,5, HERE << "aborting transaction for " << reason << - "; " << serverConnection << ", this " << this); + debugs(11,5, "aborting transaction for " << reason << "; " << tcp << ", this " << this); - if (Comm::IsConnOpen(serverConnection)) { - serverConnection->close(); - return; + if (Comm::IsConnOpen(tcp)) { + // when both called the TCP connection will be closed. + stopReceiving(reason); + stopSending(reason); } fwd->handleUnregisteredServerEnd(); === modified file 'src/http.h' --- src/http.h 2012-10-19 09:17:33 +0000 +++ src/http.h 2014-02-28 17:18:38 +0000 @@ -32,6 +32,7 @@ #ifndef SQUID_HTTP_H #define SQUID_HTTP_H +#include "anyp/Agent.h" #include "comm.h" #include "HttpStateFlags.h" #include "Server.h" @@ -40,7 +41,7 @@ class FwdState; class HttpHeader; -class HttpStateData : public ServerStateData +class HttpStateData : public AnyP::Agent, public ServerStateData { public: @@ -54,11 +55,11 @@ const HttpStateFlags &flags); virtual const Comm::ConnectionPointer & dataConnection() const; + /* should be private */ bool sendRequest(); void processReplyHeader(); void processReplyBody(); - void readReply(const CommIoCbParams &io); virtual void maybeReadVirginBody(); // read response data from the network // Determine whether the response is a cacheable representation @@ -72,24 +73,28 @@ int header_bytes_read; // to find end of response, int64_t reply_bytes_read; // without relying on StoreEntry int body_bytes_truncated; // positive when we read more than we wanted - MemBuf *readBuf; bool ignoreCacheControl; bool surrogateNoStore; void processSurrogateControl(HttpReply *); + // AsyncJob API + virtual bool doneAll() const {return AnyP::Agent::doneAll() && ServerStateData::doneAll() && doneWithServer();} + virtual void swanSong(); + + // AnyP::Agent API + virtual int64_t mayNeedToReadMore() const {return -1;} // XXX: whether more body or headers expected ?? how much? + virtual bool maybeDelayRead(const AsyncCall::Pointer &call); + virtual void updateByteCountersOnRead(size_t); + virtual void noteTransportReadError(int); + virtual bool processReadBuffer(MemBuf &); + virtual const char * maybeFinishedWithTransport(MemBuf &aBuf); + protected: - void processReply(); void proceedAfter1xx(); void handle1xx(HttpReply *msg); private: - /** - * The current server connection. - * Maybe open, closed, or NULL. - * Use doneWithServer() to check if the server is available for use. - */ - Comm::ConnectionPointer serverConnection; AsyncCall::Pointer closeHandler; enum ConnectionStatus { INCOMPLETE_MSG, @@ -108,7 +113,11 @@ virtual void haveParsedReplyHeaders(); virtual bool getMoreRequestBody(MemBuf &buf); virtual void closeServer(); // end communication with the server - virtual bool doneWithServer() const; // did we end communication? + /** Did we end communication? + * The current server connection may be open, closed, or NULL. + * Use doneWithServer() to check if the server is available for use. + */ + virtual bool doneWithServer() const; virtual void abortTransaction(const char *reason); // abnormal termination // consuming request body === modified file 'src/ident/AclIdent.cc' --- src/ident/AclIdent.cc 2013-05-13 22:48:23 +0000 +++ src/ident/AclIdent.cc 2013-10-13 09:12:24 +0000 @@ -86,9 +86,9 @@ ACLFilledChecklist *checklist = Filled(cl); if (checklist->rfc931[0]) { return data->match(checklist->rfc931); - } else if (checklist->conn() != NULL && checklist->conn()->clientConnection != NULL && checklist->conn()->clientConnection->rfc931[0]) { - return data->match(checklist->conn()->clientConnection->rfc931); - } else if (checklist->conn() != NULL && Comm::IsConnOpen(checklist->conn()->clientConnection)) { + } else if (checklist->conn() != NULL && checklist->conn()->tcp != NULL && checklist->conn()->tcp->rfc931[0]) { + return data->match(checklist->conn()->tcp->rfc931); + } else if (checklist->conn() != NULL && Comm::IsConnOpen(checklist->conn()->tcp)) { if (checklist->goAsync(IdentLookup::Instance())) { debugs(28, 3, "switching to ident lookup state"); return -1; @@ -135,9 +135,9 @@ ACLFilledChecklist *checklist = Filled(cl); const ConnStateData *conn = checklist->conn(); // check that ACLIdent::match() tested this lookup precondition - assert(conn && Comm::IsConnOpen(conn->clientConnection)); + assert(conn && Comm::IsConnOpen(conn->tcp)); debugs(28, 3, HERE << "Doing ident lookup" ); - Ident::Start(checklist->conn()->clientConnection, LookupDone, checklist); + Ident::Start(checklist->conn()->tcp, LookupDone, checklist); } void @@ -155,8 +155,8 @@ * Cache the ident result in the connection, to avoid redoing ident lookup * over and over on persistent connections */ - if (checklist->conn() != NULL && checklist->conn()->clientConnection != NULL && !checklist->conn()->clientConnection->rfc931[0]) - xstrncpy(checklist->conn()->clientConnection->rfc931, checklist->rfc931, USER_IDENT_SZ); + if (checklist->conn() != NULL && checklist->conn()->tcp != NULL && !checklist->conn()->tcp->rfc931[0]) + xstrncpy(checklist->conn()->tcp->rfc931, checklist->rfc931, USER_IDENT_SZ); checklist->resumeNonBlockingCheck(IdentLookup::Instance()); } === modified file 'src/peer_select.cc' --- src/peer_select.cc 2014-02-02 01:24:53 +0000 +++ src/peer_select.cc 2014-02-04 22:21:34 +0000 @@ -243,7 +243,7 @@ if (req->clientConnectionManager.valid()) { // construct a "result" adding the ORIGINAL_DST to the set instead of DIRECT Comm::ConnectionPointer p = new Comm::Connection(); - p->remote = req->clientConnectionManager->clientConnection->local; + p->remote = req->clientConnectionManager->tcp->local; p->peerType = fs->code; p->setPeer(fs->_peer); === modified file 'src/redirect.cc' --- src/redirect.cc 2013-11-23 00:58:42 +0000 +++ src/redirect.cc 2013-11-25 23:03:45 +0000 @@ -264,15 +264,15 @@ debugs(61, 5, HERE << "acl-user=" << (r->client_ident?r->client_ident:"NULL")); } - if (!r->client_ident && conn != NULL && conn->clientConnection != NULL && conn->clientConnection->rfc931[0]) { - r->client_ident = conn->clientConnection->rfc931; + if (!r->client_ident && conn != NULL && conn->tcp != NULL && conn->tcp->rfc931[0]) { + r->client_ident = conn->tcp->rfc931; debugs(61, 5, HERE << "ident-user=" << (r->client_ident?r->client_ident:"NULL")); } #if USE_SSL - if (!r->client_ident && conn != NULL && Comm::IsConnOpen(conn->clientConnection)) { - r->client_ident = sslGetUserEmail(fd_table[conn->clientConnection->fd].ssl); + if (!r->client_ident && conn != NULL && Comm::IsConnOpen(conn->tcp)) { + r->client_ident = sslGetUserEmail(fd_table[conn->tcp->fd].ssl); debugs(61, 5, HERE << "ssl-user=" << (r->client_ident?r->client_ident:"NULL")); } #endif @@ -314,8 +314,8 @@ tmpnoaddr.setNoAddr(); repContext->setReplyToError(ERR_GATEWAY_FAILURE, status, http->request->method, NULL, - http->getConn() != NULL && http->getConn()->clientConnection != NULL ? - http->getConn()->clientConnection->remote : tmpnoaddr, + http->getConn() != NULL && http->getConn()->tcp != NULL ? + http->getConn()->tcp->remote : tmpnoaddr, http->request, NULL, #if USE_AUTH === modified file 'src/stat.cc' --- src/stat.cc 2014-01-24 01:57:15 +0000 +++ src/stat.cc 2014-01-19 05:41:22 +0000 @@ -2019,16 +2019,16 @@ storeAppendPrintf(s, "Connection: %p\n", conn); if (conn != NULL) { - const int fd = conn->clientConnection->fd; + const int fd = conn->tcp->fd; storeAppendPrintf(s, "\tFD %d, read %" PRId64 ", wrote %" PRId64 "\n", fd, fd_table[fd].bytes_read, fd_table[fd].bytes_written); storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc); storeAppendPrintf(s, "\tin: buf %p, offset %ld, size %ld\n", - conn->in.buf, (long int) conn->in.notYetUsed, (long int) conn->in.allocatedSize); + conn->inBuf.content(), (long int) conn->inBuf.contentSize(), (long int) conn->inBuf.capacity); storeAppendPrintf(s, "\tremote: %s\n", - conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN)); + conn->tcp->remote.toUrl(buf,MAX_IPSTRLEN)); storeAppendPrintf(s, "\tlocal: %s\n", - conn->clientConnection->local.toUrl(buf,MAX_IPSTRLEN)); + conn->tcp->local.toUrl(buf,MAX_IPSTRLEN)); storeAppendPrintf(s, "\tnrequests: %d\n", conn->nrequests); } @@ -2053,13 +2053,13 @@ p = http->request->extacl_user.termedBuf(); } - if (!p && conn != NULL && conn->clientConnection->rfc931[0]) - p = conn->clientConnection->rfc931; + if (!p && conn != NULL && conn->tcp->rfc931[0]) + p = conn->tcp->rfc931; #if USE_SSL - if (!p && conn != NULL && Comm::IsConnOpen(conn->clientConnection)) - p = sslGetUserEmail(fd_table[conn->clientConnection->fd].ssl); + if (!p && conn != NULL && Comm::IsConnOpen(conn->tcp)) + p = sslGetUserEmail(fd_table[conn->tcp->fd].ssl); #endif === modified file 'src/tests/stub_client_side.cc' --- src/tests/stub_client_side.cc 2014-01-05 19:49:23 +0000 +++ src/tests/stub_client_side.cc 2014-02-28 22:46:19 +0000 @@ -28,40 +28,29 @@ void ClientSocketContext::noteIoError(const int xerrno) STUB void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB -void ConnStateData::readSomeData() STUB -int ConnStateData::getAvailableBufferLength() const STUB_RETVAL(0) bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false) void ConnStateData::freeAllContexts() STUB -void ConnStateData::notifyAllContexts(const int xerrno) STUB +void ConnStateData::noteTransportReadError(const int xerrno) STUB +bool ConnStateData::injectPrefixBytesXXX(const MemBuf &) STUB bool ConnStateData::clientParseRequests() STUB_RETVAL(false) void ConnStateData::readNextRequest() STUB -bool ConnStateData::maybeMakeSpaceAvailable() STUB_RETVAL(false) void ConnStateData::addContextToQueue(ClientSocketContext * context) STUB int ConnStateData::getConcurrentRequestCount() const STUB_RETVAL(0) bool ConnStateData::isOpen() const STUB_RETVAL(false) void ConnStateData::checkHeaderLimits() STUB void ConnStateData::sendControlMsg(HttpControlMsg msg) STUB -char *ConnStateData::In::addressToReadInto() const STUB_RETVAL(NULL) -int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0) #if USE_AUTH void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB #endif bool ConnStateData::transparent() const STUB_RETVAL(false) -bool ConnStateData::reading() const STUB_RETVAL(false) -void ConnStateData::stopReading() STUB -void ConnStateData::stopReceiving(const char *error) STUB -void ConnStateData::stopSending(const char *error) STUB void ConnStateData::expectNoForwarding() STUB void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB -bool ConnStateData::handleReadData(char *buf, size_t size) STUB_RETVAL(false) -bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false) +bool ConnStateData::processRequestBodyData(MemBuf &) STUB_RETVAL(false) void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB void ConnStateData::unpinConnection() STUB const Comm::ConnectionPointer ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *peer) STUB_RETVAL(NULL) void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) STUB -void ConnStateData::clientReadRequest(const CommIoCbParams &io) STUB -void ConnStateData::connStateClosed(const CommCloseCbParams &io) STUB void ConnStateData::requestTimeout(const CommTimeoutCbParams ¶ms) STUB void ConnStateData::swanSong() STUB void ConnStateData::quitAfterError(HttpRequest *request) STUB === added file 'src/tests/stub_libanyp.cc' --- src/tests/stub_libanyp.cc 1970-01-01 00:00:00 +0000 +++ src/tests/stub_libanyp.cc 2014-02-28 22:45:23 +0000 @@ -0,0 +1,39 @@ +#include "squid.h" + +#define STUB_API "any/libanyp.la" +#include "tests/STUB.h" + +#include "anyp/Agent.h" +void AnyP::Agent::readSomeData() STUB +void AnyP::Agent::stopReceiving(const char *) STUB +void AnyP::Agent::sendSomeData(MemBuf &, AsyncCall::Pointer &) STUB +void AnyP::Agent::stopSending(const char *error) STUB +void AnyP::Agent::stopReadingXXX() STUB +//AnyP::Agent::Agent(); +bool AnyP::Agent::doneAll() const STUB_RETVAL(false) +void AnyP::Agent::swanSong() STUB +void AnyP::Agent::connectionInit(const Comm::ConnectionPointer &c) STUB +void AnyP::Agent::releaseConnection(const char *reason) STUB +bool AnyP::Agent::maybeMakeSpaceAvailable() STUB_RETVAL(false) +void AnyP::Agent::readHandler(const CommIoCbParams &io) STUB + +#include "anyp/PortCfg.h" +//AnyP::PortCfg::PortCfg(); +//AnyP::PortCfg::~PortCfg(); +AnyP::PortCfg *AnyP::PortCfg::clone() const STUB_RETVAL(NULL) +#if USE_SSL +void AnyP::PortCfg::configureSslServerContext() STUB +#endif +void AnyP::PortCfg::setTransport(const char *) STUB +//int AnyP::PortCfg::NHttpSockets; +//int AnyP::PortCfg::HttpSockets[MAXTCPLISTENPORTS]; + +#include "anyp/ProtocolType.h" +const char *AnyP::ProtocolType::ProtocolType_str[]; + +// no definitions necessary for these two: +//#include "anyp/ProtocolVersion.h" +//#include "anyp/TrafficMode.h" + +#include "anypUriScheme.h" +char const *AnyP::UriScheme::c_str() const; === modified file 'src/tunnel.cc' --- src/tunnel.cc 2014-02-21 10:46:19 +0000 +++ src/tunnel.cc 2014-02-28 12:58:36 +0000 @@ -868,7 +868,7 @@ debugs(26, 4, HERE << "MISS access forbidden."); err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden, request); *status_ptr = Http::scForbidden; - errorSend(http->getConn()->clientConnection, err); + errorSend(http->getConn()->tcp, err); return; } } @@ -885,7 +885,7 @@ tunnelState->request = request; tunnelState->server.size_ptr = size_ptr; tunnelState->status_ptr = status_ptr; - tunnelState->client.conn = http->getConn()->clientConnection; + tunnelState->client.conn = http->getConn()->tcp; tunnelState->al = al; comm_add_close_handler(tunnelState->client.conn->fd,