=== modified file 'src/CommCalls.h' --- src/CommCalls.h 2010-08-24 00:12:54 +0000 +++ src/CommCalls.h 2011-01-22 12:50:29 +0000 @@ -176,6 +176,7 @@ { public: typedef CommAcceptCbParams Params; + typedef RefCount Pointer; CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams); void dial(); @@ -259,11 +260,19 @@ class CommCbFunPtrCallT: public AsyncCall { public: + typedef RefCount > Pointer; typedef typename Dialer::Params Params; inline CommCbFunPtrCallT(int debugSection, int debugLevel, const char *callName, const Dialer &aDialer); + inline CommCbFunPtrCallT(const CommCbFunPtrCallT &o) : + AsyncCall(o.debugSection, o.debugLevel, o.name), + dialer(o.dialer) + {} + + ~CommCbFunPtrCallT() {} + virtual CallDialer* getDialer() { return &dialer; } public: @@ -272,6 +281,9 @@ protected: inline virtual bool canFire(); inline virtual void fire(); + +private: + CommCbFunPtrCallT & operator=(const CommCbFunPtrCallT &); // not defined. not permitted. }; // Conveninece wrapper: It is often easier to call a templated function than === modified file 'src/ProtoPort.cc' --- src/ProtoPort.cc 2010-11-18 08:01:53 +0000 +++ src/ProtoPort.cc 2011-01-09 00:58:29 +0000 @@ -3,15 +3,17 @@ */ #include "squid.h" +#include "comm.h" #include "ProtoPort.h" #if HAVE_LIMITS #include #endif -http_port_list::http_port_list(const char *aProtocol) +http_port_list::http_port_list(const char *aProtocol) : + listenFd(-1) #if USE_SSL - : - http(*this), dynamicCertMemCacheSize(std::numeric_limits::max()) + , http(*this) + , dynamicCertMemCacheSize(std::numeric_limits::max()) #endif { protocol = xstrdup(aProtocol); @@ -19,7 +21,10 @@ http_port_list::~http_port_list() { - delete listener; + if (listenFd >= 0) { + comm_close(listenFd); + listenFd = -1; + } safe_free(name); safe_free(defaultsite); === modified file 'src/ProtoPort.h' --- src/ProtoPort.h 2010-11-18 08:01:53 +0000 +++ src/ProtoPort.h 2011-01-09 00:55:34 +0000 @@ -4,9 +4,7 @@ #ifndef SQUID_PROTO_PORT_H #define SQUID_PROTO_PORT_H -//#include "typedefs.h" #include "cbdata.h" -#include "comm/ListenStateData.h" #if USE_SSL #include "ssl/gadgets.h" @@ -43,11 +41,11 @@ } tcp_keepalive; /** - * The FD listening socket handler. - * If not NULL we are actively listening for client requests. - * delete to close the socket. + * The FD listening socket. + * If >= 0 we are actively listening for client requests. + * use comm_close(listenFd) to stop. */ - Comm::ListenStateData *listener; + int listenFd; #if USE_SSL // XXX: temporary hack to ease move of SSL options to http_port === modified file 'src/base/AsyncCall.h' --- src/base/AsyncCall.h 2010-12-02 23:33:27 +0000 +++ src/base/AsyncCall.h 2011-01-22 12:49:57 +0000 @@ -84,6 +84,10 @@ private: const char *isCanceled; // set to the cancelation reason by cancel() + + // not implemented to prevent nil calls from being passed around and unknowingly scheduled, for now. + AsyncCall(); + AsyncCall(const AsyncCall &); }; inline @@ -122,6 +126,12 @@ const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName), dialer(aDialer) {} + AsyncCallT(const AsyncCallT &o): + AsyncCall(o.debugSection, o.debugLevel, o.name), + dialer(o.dialer) {} + + ~AsyncCallT() {} + CallDialer *getDialer() { return &dialer; } protected: @@ -132,6 +142,9 @@ virtual void fire() { dialer.dial(*this); } Dialer dialer; + +private: + AsyncCallT & operator=(const AsyncCallT &); // not defined. call assignments not permitted. }; template === modified file 'src/base/Subscription.h' --- src/base/Subscription.h 2010-10-07 07:53:45 +0000 +++ src/base/Subscription.h 2011-01-22 08:40:46 +0000 @@ -42,7 +42,7 @@ public: /// Must be passed an object. nil pointers are not permitted. explicit CallSubscription(const RefCount &aCall) : call(aCall) { assert(aCall != NULL); } - virtual AsyncCall::Pointer callback() const { return new Call_(call); } + virtual AsyncCall::Pointer callback() const { return new Call_(*call); } private: const RefCount call; ///< gets copied to create callback calls === modified file 'src/client_side.cc' --- src/client_side.cc 2011-01-10 09:43:43 +0000 +++ src/client_side.cc 2011-01-21 15:27:31 +0000 @@ -96,9 +96,10 @@ #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" +#include "CommCalls.h" +#include "comm/Loops.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" -#include "comm/Loops.h" +#include "comm/TcpAcceptor.h" #include "ConnectionDetail.h" #include "eui/Config.h" #include "fde.h" @@ -108,6 +109,7 @@ #include "ident/Config.h" #include "ident/Ident.h" #include "ip/Intercept.h" +#include "ipc/FdNotes.h" #include "ipc/StartListening.h" #include "MemBuf.h" #include "MemObject.h" @@ -134,13 +136,13 @@ #define comm_close comm_lingering_close #endif -/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call +/// dials clientListenerConnectionOpened call class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb { public: - typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg); - ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg): - handler(aHandler), portCfg(aPortCfg) {} + typedef void (*Handler)(int fd, int flags, int errNo, http_port_list *portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub); + ListeningStartedDialer(Handler aHandler, int openFlags, http_port_list *aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub): + handler(aHandler), portCfg(aPortCfg), portTypeNote(note), commOpenListenerFlags(openFlags), sub(aSub) {} virtual void print(std::ostream &os) const { startPrint(os) << @@ -148,20 +150,19 @@ } virtual bool canDial(AsyncCall &) const { return true; } - virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); } + virtual void dial(AsyncCall &) { (handler)(fd, commOpenListenerFlags, errNo, portCfg, portTypeNote, sub); } public: Handler handler; private: - http_port_list *portCfg; ///< from Config.Sockaddr.http + http_port_list *portCfg; ///< from Config.Sockaddr.http + Ipc::FdNoteId portTypeNote; ///< Type of IPC socket being opened + int commOpenListenerFlags; ///< flags used by comm_open_listener + Subscription::Pointer sub; ///< The handler to be subscribed for this connetion listener }; - -static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s); -#if USE_SSL -static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s); -#endif +static void clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub); /* our socket-related context */ @@ -3121,14 +3122,14 @@ /** Handle a new connection on HTTP socket. */ void -httpAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { http_port_list *s = (http_port_list *)data; ConnStateData *connState = NULL; if (flag != COMM_OK) { - debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + // Its possible the call was still queued when the client disconnected + debugs(33, 2, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } @@ -3367,15 +3368,14 @@ /** handle a new HTTPS connection */ static void -httpsAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { https_port_list *s = (https_port_list *)data; SSL_CTX *sslContext = s->staticSslContext.get(); if (flag != COMM_OK) { - errno = xerrno; - debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + // Its possible the call was still queued when the client disconnected + debugs(33, 2, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } @@ -3558,7 +3558,7 @@ /// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed static bool -OpenedHttpSocket(int fd, const char *msgIfFail) +OpenedHttpSocket(int fd, const Ipc::FdNoteId portType) { if (fd < 0) { Must(NHttpSockets > 0); // we tried to open some @@ -3566,7 +3566,7 @@ Must(HttpSockets[NHttpSockets] < 0); // no extra fds received if (!NHttpSockets) // we could not open any listen sockets at all - fatal(msgIfFail); + fatalf("Unable to open %s",FdNote(portType)); return false; } @@ -3622,13 +3622,16 @@ const int openFlags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0); - AsyncCall::Pointer callback = asyncCall(33,2, - "clientHttpConnectionOpened", - ListeningStartedDialer(&clientHttpConnectionOpened, s)); - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, - Ipc::fdnHttpSocket, callback); - - HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); + + AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, s, Ipc::fdnHttpSocket, sub)); + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall); + + HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened } #if USE_SSL @@ -3641,27 +3644,27 @@ /// process clientHttpConnectionsOpen result static void -clientHttpConnectionOpened(int fd, int, http_port_list *s) +clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub) { - if (!OpenedHttpSocket(fd, "Cannot open HTTP Port")) + s->listenFd = fd; + if (!OpenedHttpSocket(s->listenFd, portTypeNote)) return; Must(s); - - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)", - CommAcceptCbPtrFun(httpAccept, s)); - - s->listener = new Comm::ListenStateData(fd, call, true); - - debugs(1, 1, "Accepting " << + Must(s->listenFd >= 0); + + // TCP: setup a job to handle accept() with subscribed handler + AsyncJob::Start(new Comm::TcpAcceptor(s->listenFd, s->s, flags, FdNote(portTypeNote), sub)); + + debugs(1, 1, "Accepting" << (s->intercepted ? " intercepted" : "") << (s->spoof_client_ip ? " spoofing" : "") << (s->sslBump ? " bumpy" : "") << (s->accel ? " accelerated" : "") - << " HTTP connections at " << s->s - << ", FD " << fd << "." ); + << FdNote(portTypeNote) << " connections at " + << " FD " << s->listenFd << " on " << s->s); - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for + Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for } #if USE_SSL @@ -3683,35 +3686,23 @@ continue; } - AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened", - ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http)); - - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING, - Ipc::fdnHttpsSocket, call); + const int openFlags = COMM_NONBLOCKING | + (s->spoof_client_ip ? COMM_TRANSPARENT : 0); + + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); + + AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, + &s->http, Ipc::fdnHttpsSocket, sub)); + + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall); HttpSockets[NHttpSockets++] = -1; } } - -/// process clientHttpsConnectionsOpen result -static void -clientHttpsConnectionOpened(int fd, int, http_port_list *s) -{ - if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port")) - return; - - Must(s); - - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)", - CommAcceptCbPtrFun(httpsAccept, s)); - - s->listener = new Comm::ListenStateData(fd, call, true); - - debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << "."); - - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for -} - #endif void @@ -3730,19 +3721,19 @@ clientHttpConnectionsClose(void) { for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #if USE_SSL for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #endif === modified file 'src/comm.cc' --- src/comm.cc 2011-01-10 09:43:43 +0000 +++ src/comm.cc 2011-01-10 12:31:49 +0000 @@ -40,9 +40,9 @@ #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" #include "comm/IoCallback.h" +#include "comm/Loops.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" -#include "comm/Loops.h" +#include "comm/TcpAcceptor.h" #include "CommIO.h" #include "CommRead.h" #include "ConnectionDetail.h" @@ -144,7 +144,7 @@ bool isOpen(const int fd) { - return fd_table[fd].flags.open != 0; + return fd >= 0 && fd_table[fd].flags.open != 0; } /** === modified file 'src/comm/AcceptLimiter.cc' --- src/comm/AcceptLimiter.cc 2009-12-31 02:35:01 +0000 +++ src/comm/AcceptLimiter.cc 2011-01-10 00:52:51 +0000 @@ -1,6 +1,6 @@ #include "config.h" #include "comm/AcceptLimiter.h" -#include "comm/ListenStateData.h" +#include "comm/TcpAcceptor.h" #include "fde.h" Comm::AcceptLimiter Comm::AcceptLimiter::Instance_; @@ -11,7 +11,7 @@ } void -Comm::AcceptLimiter::defer(Comm::ListenStateData *afd) +Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd) { afd->isLimited++; debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); @@ -19,14 +19,33 @@ } void +Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd) +{ + for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) { + if (deferred[i] == afd) { + deferred[i]->isLimited--; + deferred[i] = NULL; // fast. kick() will skip empty entries later. + debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); + } + } +} + +void Comm::AcceptLimiter::kick() { + // TODO: this could be optimized further with an iterator to search + // looking for first non-NULL, followed by dumping the first N + // with only one shift()/pop_front operation + debugs(5, 5, HERE << " size=" << deferred.size()); - if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { - debugs(5, 5, HERE << " doing one."); + while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */ - ListenStateData *temp = deferred.shift(); - temp->isLimited--; - temp->acceptNext(); + TcpAcceptor *temp = deferred.shift(); + if (temp != NULL) { + debugs(5, 5, HERE << " doing one."); + temp->isLimited--; + temp->acceptNext(); + break; + } } } === modified file 'src/comm/AcceptLimiter.h' --- src/comm/AcceptLimiter.h 2010-01-13 01:13:17 +0000 +++ src/comm/AcceptLimiter.h 2011-01-08 14:09:20 +0000 @@ -6,7 +6,7 @@ namespace Comm { -class ListenStateData; +class TcpAcceptor; /** * FIFO Queue holding listener socket handlers which have been activated @@ -25,7 +25,10 @@ static AcceptLimiter &Instance(); /** delay accepting a new client connection. */ - void defer(Comm::ListenStateData *afd); + void defer(Comm::TcpAcceptor *afd); + + /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */ + void removeDead(const Comm::TcpAcceptor *afd); /** try to accept and begin processing any delayed client connections. */ void kick(); @@ -34,7 +37,7 @@ static AcceptLimiter Instance_; /** FIFO queue */ - Vector deferred; + Vector deferred; }; }; // namepace Comm === modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2011-01-10 09:43:43 +0000 +++ src/comm/Makefile.am 2011-01-10 12:32:06 +0000 @@ -7,8 +7,6 @@ libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ - ListenStateData.cc \ - ListenStateData.h \ Loops.h \ ModDevPoll.cc \ ModEpoll.cc \ @@ -16,6 +14,8 @@ ModPoll.cc \ ModSelect.cc \ ModSelectWin32.cc \ + TcpAcceptor.cc \ + TcpAcceptor.h \ \ IoCallback.cc \ IoCallback.h \ === renamed file 'src/comm/ListenStateData.cc' => 'src/comm/TcpAcceptor.cc' --- src/comm/ListenStateData.cc 2011-01-10 09:43:43 +0000 +++ src/comm/TcpAcceptor.cc 2011-01-22 13:38:25 +0000 @@ -33,16 +33,104 @@ */ #include "squid.h" +#include "base/TextException.h" #include "CommCalls.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" -#include "comm/ListenStateData.h" #include "comm/Loops.h" +#include "comm/TcpAcceptor.h" #include "ConnectionDetail.h" #include "fde.h" #include "protos.h" #include "SquidTime.h" +namespace Comm { + CBDATA_CLASS_INIT(TcpAcceptor); +}; + +Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub) : + AsyncJob("Comm::TcpAcceptor"), + errcode(0), + fd(listenFd), + isLimited(0), + theCallSub(aSub), + local_addr(laddr) +{} + +void +Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) +{ + debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << aSub); + unsubscribe("subscription change"); + theCallSub = aSub; +} + +void +Comm::TcpAcceptor::unsubscribe(const char *reason) +{ + debugs(5, 5, HERE << status() << " AsyncCall Subscription " << theCallSub << " removed: " << reason); + theCallSub = NULL; +} + +void +Comm::TcpAcceptor::start() +{ + debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << theCallSub); + + Must(isOpen(fd)); + + setListen(); + + // if no error so far start accepting connections. + if (errcode == 0) + SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); +} + +bool +Comm::TcpAcceptor::doneAll() const +{ + // stop when FD is closed + if (!isOpen(fd)) { + return AsyncJob::doneAll(); + } + + // stop when handlers are gone + if (theCallSub == NULL) { + return AsyncJob::doneAll(); + } + + // open FD with handlers...keep accepting. + return false; +} + +void +Comm::TcpAcceptor::swanSong() +{ + debugs(5,5, HERE); + unsubscribe("swanSong"); + fd = -1; + AcceptLimiter::Instance().removeDead(this); + AsyncJob::swanSong(); +} + +const char * +Comm::TcpAcceptor::status() const +{ + static char ipbuf[MAX_IPSTRLEN] = {'\0'}; + if (ipbuf[0] == '\0') + local_addr.ToHostname(ipbuf, MAX_IPSTRLEN); + + static MemBuf buf; + buf.reset(); + buf.Printf(" FD %d, %s",fd, ipbuf); + + const char *jobStatus = AsyncJob::status(); + buf.append(jobStatus, strlen(jobStatus)); + + return buf.content(); +} + /** * New-style listen and accept routines * @@ -51,11 +139,11 @@ * accept()ed some time later. */ void -Comm::ListenStateData::setListen() +Comm::TcpAcceptor::setListen() { errcode = 0; // reset local errno copy. if (listen(fd, Squid_MaxFD >> 2) < 0) { - debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); + debugs(50, DBG_CRITICAL, "ERROR: listen(" << status() << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); errcode = errno; return; } @@ -67,37 +155,19 @@ debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd); xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) - debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); #elif defined(TCP_DEFER_ACCEPT) int seconds = 30; if (strncmp(Config.accept_filter, "data=", 5) == 0) seconds = atoi(Config.accept_filter + 5); if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) - debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); #else - debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); + debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS"); #endif } } -Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) : - fd(aFd), - theCallback(call), - mayAcceptMore(accept_many) -{ - assert(aFd >= 0); - debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call); - assert(isOpen(aFd)); - setListen(); - SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); -} - -Comm::ListenStateData::~ListenStateData() -{ - comm_close(fd); - fd = -1; -} - /** * This private callback is called whenever a filedescriptor is ready * to dupe itself and fob off an accept()ed connection @@ -108,23 +178,30 @@ * done later when enough sockets become available. */ void -Comm::ListenStateData::doAccept(int fd, void *data) +Comm::TcpAcceptor::doAccept(int fd, void *data) { - debugs(5, 2, HERE << "New connection on FD " << fd); - - assert(isOpen(fd)); - ListenStateData *afd = static_cast(data); - - if (!okToAccept()) { - AcceptLimiter::Instance().defer(afd); - } else { - afd->acceptNext(); + try { + debugs(5, 2, HERE << "New connection on FD " << fd); + + Must(isOpen(fd)); + TcpAcceptor *afd = static_cast(data); + + if (!okToAccept()) { + AcceptLimiter::Instance().defer(afd); + } else { + afd->acceptNext(); + } + SetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0); + + } catch(const std::exception &e) { + fatalf("FATAL: error while accepting new client connection: %s\n", e.what()); + } catch(...) { + fatal("FATAL: error while accepting new client connection: [unkown]\n"); } - SetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0); } bool -Comm::ListenStateData::okToAccept() +Comm::TcpAcceptor::okToAccept() { static time_t last_warn = 0; @@ -140,7 +217,7 @@ } void -Comm::ListenStateData::acceptOne() +Comm::TcpAcceptor::acceptOne() { /* * We don't worry about running low on FDs here. Instead, @@ -149,42 +226,45 @@ */ /* Accept a new connection */ - ConnectionDetail connDetails; - int newfd = oldAccept(connDetails); + ConnectionDetail newConnDetails; + int newFd = -1; + const comm_err_t flag = oldAccept(newConnDetails, &newFd); /* Check for errors */ - if (newfd < 0) { + if (!isOpen(newFd)) { - if (newfd == COMM_NOMESSAGE) { + if (flag == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback); + debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub); SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); return; } // A non-recoverable error; notify the caller */ - debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback); - notify(-1, COMM_ERROR, connDetails); - mayAcceptMore = false; + debugs(5, 5, HERE << "non-recoverable error:" << status() << " handler Subscription: " << theCallSub); + notify(flag, newConnDetails, newFd); + mustStop("Listener socket closed"); return; } - debugs(5, 5, HERE << "accepted: FD " << fd << - " newfd: " << newfd << " from: " << connDetails.peer << - " handler: " << theCallback); - notify(newfd, COMM_OK, connDetails); + debugs(5, 5, HERE << "Listener: FD " << fd << + " accepted new connection from " << newConnDetails.peer << + " handler Subscription: " << theCallSub); + notify(flag, newConnDetails, newFd); } void -Comm::ListenStateData::acceptNext() +Comm::TcpAcceptor::acceptNext() { - assert(isOpen(fd)); + Must(isOpen(fd)); debugs(5, 2, HERE << "connection on FD " << fd); acceptOne(); } +// XXX: obsolete comment? +// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback(). void -Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails) +Comm::TcpAcceptor::notify(const comm_err_t flag, const ConnectionDetail &connDetails, int newFd) const { // listener socket handlers just abandon the port with COMM_ERR_CLOSING // it should only happen when this object is deleted... @@ -192,26 +272,29 @@ return; } - if (theCallback != NULL) { - typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(theCallback); + if (theCallSub != NULL) { + AsyncCall::Pointer call = theCallSub->callback(); + CommAcceptCbParams ¶ms = GetCommParams(call); params.fd = fd; - params.nfd = newfd; + params.nfd = newFd; params.details = connDetails; params.flag = flag; params.xerrno = errcode; - ScheduleCallHere(theCallback); - if (!mayAcceptMore) - theCallback = NULL; + ScheduleCallHere(call); } } /** * accept() and process - * Wait for an incoming connection on FD. + * Wait for an incoming connection on our listener socket. + * + * \retval COMM_OK success. details parameter filled. + * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in. + * \retval COMM_ERROR an outright failure occured. + * Or if this client has too many connections already. */ -int -Comm::ListenStateData::oldAccept(ConnectionDetail &details) +comm_err_t +Comm::TcpAcceptor::oldAccept(ConnectionDetail &details, int *newFd) { PROF_start(comm_accept); statCounter.syscalls.sock.accepts++; @@ -228,17 +311,19 @@ PROF_stop(comm_accept); if (ignoreErrno(errno)) { - debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 5, HERE << status() << ": " << xstrerror()); return COMM_NOMESSAGE; } else if (ENFILE == errno || EMFILE == errno) { - debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 3, HERE << status() << ": " << xstrerror()); return COMM_ERROR; } else { - debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 1, HERE << status() << ": " << xstrerror()); return COMM_ERROR; } } + Must(sock >= 0); + *newFd = sock; details.peer = *gai; if ( Config.client_ip_max_connections >= 0) { @@ -249,15 +334,16 @@ } } + // lookup the local-end details of this new connection details.me.InitAddrInfo(gai); - details.me.SetEmpty(); getsockname(sock, gai->ai_addr, &gai->ai_addrlen); details.me = *gai; - - commSetCloseOnExec(sock); + details.me.FreeAddrInfo(gai); /* fdstat update */ + // XXX : these are not all HTTP requests. use a note about type and ip:port details-> + // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port" fd_open(sock, FD_SOCKET, "HTTP Request"); fdd_table[sock].close_file = NULL; @@ -266,15 +352,16 @@ fde *F = &fd_table[sock]; details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); F->remote_port = details.peer.GetPort(); - F->local_addr.SetPort(details.me.GetPort()); + F->local_addr = details.me; F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET; - details.me.FreeAddrInfo(gai); + // set socket flags + commSetCloseOnExec(sock); commSetNonBlocking(sock); /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */ F->flags.transparent = fd_table[fd].flags.transparent; PROF_stop(comm_accept); - return sock; + return COMM_OK; } === renamed file 'src/comm/ListenStateData.h' => 'src/comm/TcpAcceptor.h' --- src/comm/ListenStateData.h 2010-11-27 01:58:38 +0000 +++ src/comm/TcpAcceptor.h 2011-01-22 13:38:50 +0000 @@ -1,39 +1,86 @@ -#ifndef SQUID_LISTENERSTATEDATA_H -#define SQUID_LISTENERSTATEDATA_H +#ifndef SQUID_COMM_TCPACCEPTOR_H +#define SQUID_COMM_TCPACCEPTOR_H #include "base/AsyncCall.h" -#include "comm.h" +#include "base/Subscription.h" +#include "CommCalls.h" +#include "comm_err_t.h" +#include "comm/TcpAcceptor.h" +#include "ip/Address.h" + #if HAVE_MAP #include #endif -class ConnectionDetail; - namespace Comm { -class ListenStateData +class AcceptLimiter; + +/** + * Listens on an FD for new incoming connections and + * emits an active FD descriptor for the new client. + * + * Handles all event limiting required to quash inbound connection + * floods within the global FD limits of available Squid_MaxFD and + * client_ip_max_connections. + * + * Fills the emitted connection with all connection details able to + * be looked up. Currently these are the local/remote IP:port details + * and the listening socket transparent-mode flag. + */ +class TcpAcceptor : public AsyncJob { +private: + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); + virtual const char *status() const; + + TcpAcceptor(const TcpAcceptor &); // not implemented. public: - ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); - ListenStateData(const ListenStateData &r); // not implemented. - ~ListenStateData(); - - void subscribe(AsyncCall::Pointer &call); + TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub); + + /** Subscribe a handler to receive calls back about new connections. + * Unsubscribes any existing subscribed handler. + */ + void subscribe(const Subscription::Pointer &aSub); + + /** Remove the currently waiting callback subscription. + * Already scheduled callbacks remain scheduled. + */ + void unsubscribe(const char *reason); + + /** Try and accept another connection (synchronous). + * If one is pending already the subscribed callback handler will be scheduled + * to handle it before this method returns. + */ void acceptNext(); - void notify(int newfd, comm_err_t flag, const ConnectionDetail &details); - int fd; + /// Call the subscribed callback handler with details about a new connection. + void notify(const comm_err_t flags, const ConnectionDetail &newConnDetails, const int newFd) const; /// errno code of the last accept() or listen() action if one occurred. int errcode; - /// whether this socket is delayed and on the AcceptLimiter queue. - int32_t isLimited; + /// conn being listened on for new connections + /// Reserved for read-only use. + // NP: public only until we can hide it behind connection handles + int fd; + +protected: + friend class AcceptLimiter; + int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. private: - /// Method to test if there are enough file escriptors to open a new client connection + Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. + + /// IP Address and port being listened on + Ip::Address local_addr; + + /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed static bool okToAccept(); @@ -41,14 +88,12 @@ static void doAccept(int fd, void *data); void acceptOne(); - int oldAccept(ConnectionDetail &details); - - AsyncCall::Pointer theCallback; - bool mayAcceptMore; - + comm_err_t oldAccept(ConnectionDetail &newConnDetails, int *fd); void setListen(); + + CBDATA_CLASS2(TcpAcceptor); }; } // namespace Comm -#endif /* SQUID_LISTENERSTATEDATA_H */ +#endif /* SQUID_COMM_TCPACCEPTOR_H */ === modified file 'src/ftp.cc' --- src/ftp.cc 2011-01-14 14:10:21 +0000 +++ src/ftp.cc 2011-01-22 13:26:24 +0000 @@ -34,8 +34,9 @@ #include "squid.h" #include "comm.h" +#include "CommCalls.h" +#include "comm/TcpAcceptor.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" #include "compat/strtoll.h" #include "ConnectionDetail.h" #include "errorpage.h" @@ -153,13 +154,11 @@ void clear(); /// just resets fd and close handler. does not close active connections. - int fd; /// channel descriptor; \todo: remove because the closer has it - - /** Current listening socket handler. delete on shutdown or abort. - * FTP stores a copy of the FD in the field fd above. - * Use close() to properly close the channel. - */ - Comm::ListenStateData *listener; + int fd; /// channel descriptor + + Ip::Address local; ///< The local IP address:port this channel is using + + int flags; ///< socket flags used when opening. private: AsyncCall::Pointer closer; /// Comm close handler callback @@ -245,6 +244,12 @@ void completedListing(void); void dataComplete(); void dataRead(const CommIoCbParams &io); + + /// ignore timeout on CTRL channel. set read timeout on DATA channel. + void switchTimeoutToDataChannel(); + /// create a data channel acceptor and start listening. + void listenForDataChannel(const int fd, const char *note); + int checkAuth(const HttpHeader * req_hdr); void checkUrlpath(); void buildTitleUrl(); @@ -443,6 +448,7 @@ void FtpStateData::ctrlClosed(const CommCloseCbParams &io) { + debugs(9, 4, HERE); ctrl.clear(); deleteThis("FtpStateData::ctrlClosed"); } @@ -451,10 +457,10 @@ void FtpStateData::dataClosed(const CommCloseCbParams &io) { - if (data.listener) { - delete data.listener; - data.listener = NULL; - data.fd = -1; + debugs(9, 4, HERE); + if (data.fd >= 0) { + comm_close(data.fd); + // NP clear() does the: data.fd = -1; } data.clear(); failed(ERR_FTP_FAILURE, 0); @@ -606,6 +612,46 @@ } void +FtpStateData::switchTimeoutToDataChannel() +{ + commSetTimeout(ctrl.fd, -1, NULL, NULL); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout); + commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); +} + +void +FtpStateData::listenForDataChannel(const int fd, const char *note) +{ + assert(data.fd < 0); + + typedef CommCbMemFunT AcceptDialer; + typedef AsyncCallT AcceptCall; + RefCount call = static_cast(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection)); + Subscription::Pointer sub = new CallSubscription(call); + + /* open the conn if its not already open */ + int newFd = fd; + if (newFd < 0) { + newFd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, data.local, data.flags, note); + if (newFd < 0) { + debugs(5, DBG_CRITICAL, HERE << "comm_open_listener failed:" << data.local << " error: " << errno); + return; + } + debugs(9, 3, HERE << "Unconnected data socket created on FD " << newFd << ", " << data.local); + } + + assert(newFd >= 0); + Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(newFd, data.local, data.flags, note, sub); + AsyncJob::Start(tmp); + + // Ensure we have a copy of the FD opened for listening and a close handler on it. + data.opened(newFd, dataCloser()); + switchTimeoutToDataChannel(); +} + +void FtpStateData::ftpTimeout(const CommTimeoutCbParams &io) { debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" ); @@ -1066,10 +1112,16 @@ usable = end - sbuf; - debugs(9, 3, HERE << "usable = " << usable); + debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes."); if (usable == 0) { - debugs(9, 3, HERE << "didn't find end for " << entry->url() ); + if (buf[0] == '\0' && len == 1) { + debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?"); + data.readBuf->consume(len); + } else { + debugs(9, 3, HERE << "didn't find end for " << entry->url()); + debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << rfc1738_do_escape(buf,0) << "'"); + } xfree(sbuf); return; } @@ -1138,7 +1190,13 @@ * status code after the data command. FtpStateData was being * deleted in the middle of dataRead(). */ - scheduleReadControlReply(0); + /* AYJ: 2011-01-13: 226 status possibly waiting in the ctrl buffer. + * The connection will hang if we DONT send buffered_ok. + * This happens on all transfers which can be completly sent by the + * server before the 150 started status message is read in by Squid. + * ie all transfers of about one packet hang. + */ + scheduleReadControlReply(1); } void @@ -1674,7 +1732,7 @@ * establish one on the control socket. */ - if (data.fd > -1) { + if (data.fd >= 0) { AsyncCall::Pointer nullCall = NULL; commSetTimeout(data.fd, -1, nullCall); } @@ -2722,27 +2780,24 @@ static int ftpOpenListenSocket(FtpStateData * ftpState, int fallback) { - int fd; - Ip::Address addr; struct addrinfo *AI = NULL; - int on = 1; int x = 0; /// Close old data channels, if any. We may open a new one below. - ftpState->data.close(); + if ((ftpState->data.flags & COMM_REUSEADDR)) + // NP: in fact it points to the control channel. just clear it. + ftpState->data.clear(); + else + ftpState->data.close(); /* * Set up a listen socket on the same local address as the * control connection. */ - - addr.InitAddrInfo(AI); - + ftpState->data.local.InitAddrInfo(AI); x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen); - - addr = *AI; - - addr.FreeAddrInfo(AI); + ftpState->data.local = *AI; + ftpState->data.local.FreeAddrInfo(AI); if (x) { debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror()); @@ -2754,38 +2809,18 @@ * used for both control and data. */ if (fallback) { + int on = 1; setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + ftpState->ctrl.flags |= COMM_REUSEADDR; + ftpState->data.flags |= COMM_REUSEADDR; } else { /* if not running in fallback mode a new port needs to be retrieved */ - addr.SetPort(0); - } - - fd = comm_open(SOCK_STREAM, - IPPROTO_TCP, - addr, - COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0), - ftpState->entry->url()); - debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd ); - - if (fd < 0) { - debugs(9, DBG_CRITICAL, HERE << "comm_open failed"); - return -1; - } - - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false); - - if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) { - comm_close(fd); - return -1; - } - - ftpState->data.opened(fd, ftpState->dataCloser()); - ftpState->data.port = comm_local_port(fd); - ftpState->data.host = NULL; - return fd; + ftpState->data.local.SetPort(0); + ftpState->data.flags = COMM_NONBLOCKING; + } + + ftpState->listenForDataChannel((fallback?ftpState->ctrl.fd:-1), ftpState->entry->url()); + return ftpState->data.fd; } /// \ingroup ServerProtocolFTPInternal @@ -2881,6 +2916,7 @@ debugs(9, 3, HERE); ftpState->flags.pasv_supported = 0; fd = ftpOpenListenSocket(ftpState, 0); + debugs(9, 3, "Listening for FTP data connection with FD " << fd); Ip::Address::InitAddrInfo(AI); @@ -2933,77 +2969,68 @@ */ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) { - char ntoapeer[MAX_IPSTRLEN]; - debugs(9, 3, "ftpAcceptDataConnection"); - - // one connection accepted. the handler has stopped listening. drop our local pointer to it. - data.listener = NULL; + debugs(9, 3, HERE); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("entry aborted when accepting data conn"); return; } + if (io.flag != COMM_OK) { + data.close(); + debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno)); + /** \todo Need to send error message on control channel*/ + ftpFail(this); + return; + } + + /* data listening conn is no longer even open. abort. */ + if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) { + data.clear(); // ensure that it's cleared and not just closed. + return; + } + /** \par * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being * made by the remote client which is connected to the FTP control socket. + * Or the one which we were told to listen for by control channel messages (may differ under NAT). * This prevents third-party hacks, but also third-party load balancing handshakes. */ if (Config.Ftp.sanitycheck) { + char ntoapeer[MAX_IPSTRLEN]; io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN); - if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) { + if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 && + strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) { debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" << io.details.peer << "), expecting " << - fd_table[ctrl.fd].ipaddr); + fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr); - /* close the bad soures connection down ASAP. */ + /* close the bad sources connection down ASAP. */ comm_close(io.nfd); - /* we are ony accepting once, so need to re-open the listener socket. */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + /* drop the bad connection (io) by ignoring the attempt. */ return; } } - if (io.flag != COMM_OK) { - debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno)); - /** \todo XXX Need to set error message */ - ftpFail(this); - return; - } - /**\par - * Replace the Listen socket with the accepted data socket */ + * Replace the Listening socket with the accepted data socket */ data.close(); data.opened(io.nfd, dataCloser()); data.port = io.details.peer.GetPort(); - io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN); + data.host = xstrdup(fd_table[io.nfd].ipaddr); debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " << "FD " << io.nfd << " to " << io.details.peer << " FD table says: " << "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " << "data-peer= " << fd_table[data.fd].ipaddr); - - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, this, FtpStateData::ftpTimeout); - commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); - - /*\todo XXX We should have a flag to track connect state... - * host NULL -> not connected, port == local port - * host set -> connected, port == remote port - */ - /* Restart state (SENT_NLST/LIST/RETR) */ - FTP_SM_FUNCS[state] (this); + assert(haveControlChannel("ftpAcceptDataConnection")); + assert(ctrl.message == NULL); + + // Ctrl channel operations will determine what happens to this data connection } /// \ingroup ServerProtocolFTPInternal @@ -3075,34 +3102,17 @@ return; } - /*\par - * When client status is 125, or 150 without a hostname, Begin data transfer. */ + /* When client status is 125, or 150 without a hostname, Begin data transfer. */ debugs(9, 3, HERE << "starting data transfer"); + switchTimeoutToDataChannel(); sendMoreRequestBody(); - /** \par - * Cancel the timeout on the Control socket and - * establish one on the data socket. - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, this, FtpStateData::ftpTimeout); - - commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); - state = WRITING_DATA; debugs(9, 3, HERE << "writing data channel"); } else if (code == 150) { /*\par - * When client code is 150 with a hostname, Accept data channel. */ + * When client code is 150 without a hostname, Accept data channel. */ debugs(9, 3, "ftpReadStor: accepting data channel"); - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + listenForDataChannel(data.fd, data.host); } else { debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code); ftpFail(this); @@ -3222,34 +3232,15 @@ if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ - /* XXX what about Config.Timeout.read? */ + debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); return; } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); return; } else if (!ftpState->flags.tried_nlst && code > 300) { ftpSendNlst(ftpState); @@ -3285,32 +3276,12 @@ if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ debugs(9, 3, HERE << "reading data channel"); - /* XXX what about Config.Timeout.read? */ + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); } else if (code >= 300) { if (!ftpState->flags.try_slash_hack) { /* Try this as a directory missing trailing slash... */ @@ -3965,6 +3936,13 @@ fd = aFd; closer = aCloser; comm_add_close_handler(fd, closer); + + // grab the local IP address:port details for this connection + struct addrinfo *AI = NULL; + local.InitAddrInfo(AI); + getsockname(aFd, AI->ai_addr, &AI->ai_addrlen); + local = *AI; + local.FreeAddrInfo(AI); } /// planned close: removes the close handler and calls comm_close @@ -3972,15 +3950,11 @@ FtpChannel::close() { // channels with active listeners will be closed when the listener handler dies. - if (listener) { - delete listener; - listener = NULL; - comm_remove_close_handler(fd, closer); - closer = NULL; - fd = -1; - } else if (fd >= 0) { - comm_remove_close_handler(fd, closer); - closer = NULL; + if (fd >= 0) { + if (closer != NULL) { + comm_remove_close_handler(fd, closer); + closer = NULL; + } comm_close(fd); // we do not expect to be called back fd = -1; }