=== modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2012-03-29 09:22:41 +0000 +++ src/comm/Makefile.am 2013-10-16 12:14:31 +0000 @@ -23,6 +23,8 @@ ModSelectWin32.cc \ TcpAcceptor.cc \ TcpAcceptor.h \ + TcpReceiver.cc \ + TcpReceiver.h \ UdpOpenDialer.h \ Write.cc \ Write.h \ === added file 'src/comm/TcpReceiver.cc' --- src/comm/TcpReceiver.cc 1970-01-01 00:00:00 +0000 +++ src/comm/TcpReceiver.cc 2014-01-05 13:21:33 +0000 @@ -0,0 +1,263 @@ +/* + * DEBUG: section 05 TCP Read + * + * - level 2 minor TCP errors + * - level 3 duplicate reasons for halting I/O (bugs? only need to halt once) + * - level 4 reasons for errors and halting I/O + * - level 5 common I/O and buffer activity + */ +#include "squid.h" +#include "comm.h" +#include "comm/TcpReceiver.h" +#include "Debug.h" +#include "fd.h" +#include "fde.h" +#include "StatCounters.h" +#include "tools.h" + +Comm::TcpReceiver::TcpReceiver(const Comm::ConnectionPointer &c) : + AsyncJob("Comm::TcpReceiver"), + tcp(c), + stoppedReceiving_(NULL), + stoppedSending_(NULL), + closed_(), + reader_() +{} + +void +Comm::TcpReceiver::tcpConnectionInit() +{ + typedef CommCbMemFunT Dialer; + closed_ = JobCallback(33, 5, Dialer, this, Comm::TcpReceiver::tcpConnectionClosed); + comm_add_close_handler(tcp->fd, closed_); +} + +bool +Comm::TcpReceiver::doneAll() const +{ + return stoppedSending() && stoppedReceiving() && !inBuf.hasContent() && AsyncJob::doneAll(); +} + +void +Comm::TcpReceiver::swanSong() +{ + if (closed_ != NULL) + closed_->cancel("Comm::TcpReceiver::swanSong"); + + if (Comm::IsConnOpen(tcp)) + tcp->close(); +} + +void +Comm::TcpReceiver::releaseTcpConnection(const char *reason) +{ + // Used by server-side to release the connection before + // storing it in Pconn pool + comm_remove_close_handler(tcp->fd, closed_); + closed_->cancel(reason); + stopReading(); +} + +void +Comm::TcpReceiver::stopReading() +{ + /* NP: This is a hack needed to allow TunnelStateData + * to take control of a socket despite any scheduled read. + */ + if (reading()) { + comm_read_cancel(tcp->fd, reader_); + reader_ = NULL; + } +} + +void +Comm::TcpReceiver::stopReceiving(const char *error) +{ + debugs(5, 4, "receiving error (" << tcp << "): " << error << + "; old sending error: " << (stoppedSending() ? stoppedSending_ : "none")); + + if (const char *oldError = stoppedReceiving()) { + debugs(5, 3, "already stopped receiving: " << oldError); + return; // nothing has changed as far as this connection is concerned + } + + stoppedReceiving_ = error; + + if (const char *sendError = stoppedSending()) { + debugs(5, 3, "closing because also stopped sending: " << sendError); + closed_->cancel("graceful close"); + tcp->close(); + } +} + +void +Comm::TcpReceiver::stopSending(const char *error) +{ + debugs(5, 4, "sending error (" << tcp << "): " << error << + "; old receiving error: " << + (stoppedReceiving() ? stoppedReceiving_ : "none")); + + if (const char *oldError = stoppedSending()) { + debugs(5, 3, "already stopped sending: " << oldError); + return; // nothing has changed as far as this connection is concerned + } + stoppedSending_ = error; + + if (!stoppedReceiving()) { + if (const int64_t expecting = mayNeedToReadMore()) { + debugs(5, 5, "must still read " << expecting << + " bytes with " << inBuf.contentSize() << " unused"); + return; // wait for the receiver to finish reading + } + } + closed_->cancel("graceful close"); + tcp->close(); +} + +bool +Comm::TcpReceiver::maybeMakeSpaceAvailable() +{ + /* + * why <2? Because delayAwareRead() won't actually read if + * you ask it to read 1 byte. The delayed read(2) request + * just gets re-queued until the client side drains, then + * the I/O thread hangs. Better to not register any read + * handler until we get a notification from someone that + * its okay to read again if the buffer cannot grow. + */ + + if (inBuf.spaceSize() < 2) { + if (!inBuf.hasPotentialSpace()) { + debugs(5, 5, "buffer full: " << inBuf.contentSize() << " of " << (inBuf.max_capacity-1) << " bytes"); + return false; + } + (void)inBuf.space(inBuf.contentSize()*2); + debugs(5, 5, "growing buffer: content-size=" << inBuf.contentSize() << " capacity=" << inBuf.capacity); + } + + // in case the grow operation above failed for any reason. + return (inBuf.spaceSize() > 1); +} + +void +Comm::TcpReceiver::readSomeData() +{ + // one read() at a time + if (reading()) + return; + + // useless to read() after aborting read() + if (stoppedReceiving()) + return; + + // useless to try when there is no buffer space available + if (!maybeMakeSpaceAvailable()) + return; + + debugs(5, 5, tcp << ": reading... buffer space " << inBuf.spaceSize() << " bytes."); + + typedef CommCbMemFunT Dialer; + reader_ = JobCallback(33, 5, Dialer, this, Comm::TcpReceiver::readIoHandler); + if (!maybeDelayRead(reader_)) + comm_read(tcp, inBuf.space(), inBuf.spaceSize(), reader_); +} + +/// identifies whether the read() event was due to a network error happening +bool +Comm::TcpReceiver::readWasError(comm_err_t flag, int size, int xerrno) const +{ + if (flag != COMM_OK) { + debugs(5, 2, tcp << ": got flag " << flag); + return true; + } + + if (size < 0) { + if (!ignoreErrno(xerrno)) { + debugs(5, 2, tcp << " read failure: " << xstrerr(xerrno)); + return true; + } else if (!inBuf.hasContent()) { + debugs(5, 2, tcp << ": no data to process (" << xstrerr(xerrno) << ")"); + } + } + + return false; +} + +void +Comm::TcpReceiver::readIoHandler(const CommIoCbParams &io) +{ + debugs(5, 5, io.conn << " size " << io.size); + Must(reading()); + reader_ = NULL; + + /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ + if (io.flag == COMM_ERR_CLOSING) { + debugs(5, 5, io.conn << " closing Bailout."); + return; + } + + assert(Comm::IsConnOpen(tcp)); + assert(io.conn->fd == tcp->fd); + + /* + * Don't reset the timeout value here. The timeout value will be + * set to Config.Timeout.request by httpAccept() and + * clientWriteComplete(), and should apply to the request as a + * whole, not individual read() calls. Plus, it breaks our + * lame half-close detection + */ + if (readWasError(io.flag, io.size, io.xerrno)) { + noteTcpReadError(io.xerrno); + io.conn->close(); + return; + } + + if (io.flag == COMM_OK) { + if (io.size > 0) { + updateByteCountersOnRead(io.size); + inBuf.append(io.buf, io.size); + + } else if (io.size == 0) { + debugs(5, 5, io.conn << " closed?"); + stopReceiving("zero sized read(2) result"); + + // if the connection is still possibly sending + // the child class may be able to stop immediately + if (const char *reason = maybeFinishedWithTcp()) { + stopSending(reason); // will close connection + return; + } + + // if already stopped sending, the above will close the connection + if (stoppedSending()) + return; + + /* It might be half-closed, we can't tell */ + fd_table[io.conn->fd].flags.socket_eof = true; + commMarkHalfClosed(io.conn->fd); + fd_note(io.conn->fd, "half-closed"); + } + } + + bool mayReadMore = true; + // pass handling on to child instance code + if (inBuf.hasContent()) + mayReadMore = processReadBuffer(inBuf); + + if (mayReadMore && !maybeMakeSpaceAvailable()) { + stopReceiving("full read buffer - but processing does not free any space"); + mayReadMore = false; + } + + // schedule another read() - unless aborted by processing actions + if (mayReadMore) + readSomeData(); +} + +/* This is a handler normally called by comm_close() */ +void +Comm::TcpReceiver::tcpConnectionClosed(const CommCloseCbParams &io) +{ + stopReceiving("TCP connection closed"); + stopSending("TCP connection closed"); +} === added file 'src/comm/TcpReceiver.h' --- src/comm/TcpReceiver.h 1970-01-01 00:00:00 +0000 +++ src/comm/TcpReceiver.h 2014-01-05 13:21:41 +0000 @@ -0,0 +1,132 @@ +#ifndef SQUID_SRC_COMM_TCPRECEIVER_H +#define SQUID_SRC_COMM_TCPRECEIVER_H + +#include "base/AsyncCall.h" +#include "base/AsyncJob.h" +#include "comm/Connection.h" +#include "comm_err_t.h" +#include "MemBuf.h" + +class CommIoCbParams; + +namespace Comm { + +class TcpReceiver : virtual public AsyncJob +{ +public: + explicit TcpReceiver(const Comm::ConnectionPointer &c); + virtual ~TcpReceiver() {} + + // AsyncJob API + virtual bool doneAll() const; + virtual void swanSong(); + + /// initialize the TCP connection event handlers + /// close(2) callback etc. + void tcpConnectionInit(); + + /// releases TCP connection event handlers without closing it + void releaseTcpConnection(const char *reason); + + /// whether a read(2) operation is currently underway + bool reading() const {return reader_!=NULL;} + + /** Hack to cancel a read if one is scheduled, without blocking future socket use. + * \note Avoid using this method when possible. If the read(2) is done but + * AsyncCall is still queued the read(2) bytes will be lost permanently. + */ + void stopReading(); + + /// note receiving error and close as soon as we have done with writing as well + void stopReceiving(const char *error); + + /// true if we stopped receiving data + const char *stoppedReceiving() const { return stoppedReceiving_; } + + /// note response sending error and close as soon as we read the request + void stopSending(const char *error); + + /// true if we stopped sending the response + const char *stoppedSending() const { return stoppedSending_; } + + /** Called when sending has stopped to check if more read(2)s may be required. + * + * \retval >0 Number of bytes expected still to arrive. + * \retval -1 More data still expected to arrive, unknown number of bytes at this time. + * \retval 0 No more bytes expected right now. + */ + virtual int64_t mayNeedToReadMore() const = 0; + + /// called when buffer may be used to receive new network data + bool maybeMakeSpaceAvailable(); + + /** + * Called before scheduling a read(2) operation in case the + * child class uses delay_pools to slow read(2) I/O down. + * \return true if this read has been deferred. + */ + // TODO: make the delaying part of TcpReceivers task + virtual bool maybeDelayRead(const AsyncCall::Pointer &call) {return false;} + + /** called when there is new buffered data to process. + * + * If the processing requires further read(2) to be halted temporarily it + * may return false. The processor is then responsible for ensuring that + * readSomeData() is called when read(2) calls are to be resumed. + * + * \retval true if additional read(2) should be scheduled by the caller. + * \retval false if read(2) is to be suspended. + */ + virtual bool processReadBuffer(MemBuf &) = 0; + + /** Called when there is an error performing read(2) + * so the child class can perform any cleanup or error handling. + * The TCP connection will be closed immediately after this method + * completes. + */ + virtual void noteTcpReadError(int) {} + + /// Called when there has been a successful read(2). + /// The child class is responsible for data counting. + virtual void updateByteCountersOnRead(size_t) = 0; + + /// Attempt to read some data. + /// Will call processReadBuffer() when there is data to process. + void readSomeData(); + + /// callback to handle TCP read(2) input + void readIoHandler(const CommIoCbParams &io); + + /** + * called when TCP 0-size read(2) occurs to ask the child class + * whether it is able to stop sending yet. + * + * \return a reason for stopping I/O, + * or NULL to continue I/O with client half-closed. + */ + virtual const char * maybeFinishedWithTcp() = 0; + + Comm::ConnectionPointer tcp; + + MemBuf inBuf; + +private: + bool readWasError(comm_err_t flag, int size, int xerrno) const; + void tcpConnectionClosed(const CommCloseCbParams &io); + + /// the reason why we no longer read(2) or nil + const char *stoppedReceiving_; + + /// the reason why we no longer write(2) or nil + const char *stoppedSending_; + + /// callback to stop traffic processing when FD closes + AsyncCall::Pointer closed_; + + ///< set when we are reading + AsyncCall::Pointer reader_; +}; + +} // namespace Comm + +#endif /* SQUID_SRC_COMM_TCPRECEIVER_H */