=== modified file 'src/CommCalls.cc' --- src/CommCalls.cc 2009-07-12 22:56:47 +0000 +++ src/CommCalls.cc 2011-01-09 01:36:41 +0000 @@ -130,6 +130,12 @@ { } +CommAcceptCbPtrFun::CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o): + CommDialerParamsT(o.params), + handler(o.handler) +{ +} + void CommAcceptCbPtrFun::dial() { === modified file 'src/CommCalls.h' --- src/CommCalls.h 2010-08-24 00:12:54 +0000 +++ src/CommCalls.h 2011-01-09 04:56:21 +0000 @@ -176,8 +176,11 @@ { public: typedef CommAcceptCbParams Params; + typedef RefCount Pointer; CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams); + CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o); + void dial(); virtual void print(std::ostream &os) const; @@ -259,11 +262,17 @@ class CommCbFunPtrCallT: public AsyncCall { public: + typedef RefCount > Pointer; typedef typename Dialer::Params Params; inline CommCbFunPtrCallT(int debugSection, int debugLevel, const char *callName, const Dialer &aDialer); + inline CommCbFunPtrCallT(const Pointer &p) : + AsyncCall(p->debugSection, p->debugLevel, p->name), + dialer(p->dialer) + {} + virtual CallDialer* getDialer() { return &dialer; } public: === modified file 'src/ProtoPort.cc' --- src/ProtoPort.cc 2010-11-18 08:01:53 +0000 +++ src/ProtoPort.cc 2011-01-09 00:58:29 +0000 @@ -3,15 +3,17 @@ */ #include "squid.h" +#include "comm.h" #include "ProtoPort.h" #if HAVE_LIMITS #include #endif -http_port_list::http_port_list(const char *aProtocol) +http_port_list::http_port_list(const char *aProtocol) : + listenFd(-1) #if USE_SSL - : - http(*this), dynamicCertMemCacheSize(std::numeric_limits::max()) + , http(*this) + , dynamicCertMemCacheSize(std::numeric_limits::max()) #endif { protocol = xstrdup(aProtocol); @@ -19,7 +21,10 @@ http_port_list::~http_port_list() { - delete listener; + if (listenFd >= 0) { + comm_close(listenFd); + listenFd = -1; + } safe_free(name); safe_free(defaultsite); === modified file 'src/ProtoPort.h' --- src/ProtoPort.h 2010-11-18 08:01:53 +0000 +++ src/ProtoPort.h 2011-01-09 00:55:34 +0000 @@ -4,9 +4,7 @@ #ifndef SQUID_PROTO_PORT_H #define SQUID_PROTO_PORT_H -//#include "typedefs.h" #include "cbdata.h" -#include "comm/ListenStateData.h" #if USE_SSL #include "ssl/gadgets.h" @@ -43,11 +41,11 @@ } tcp_keepalive; /** - * The FD listening socket handler. - * If not NULL we are actively listening for client requests. - * delete to close the socket. + * The FD listening socket. + * If >= 0 we are actively listening for client requests. + * use comm_close(listenFd) to stop. */ - Comm::ListenStateData *listener; + int listenFd; #if USE_SSL // XXX: temporary hack to ease move of SSL options to http_port === modified file 'src/base/AsyncCall.h' --- src/base/AsyncCall.h 2010-12-02 23:33:27 +0000 +++ src/base/AsyncCall.h 2011-01-09 04:58:01 +0000 @@ -45,6 +45,8 @@ friend class AsyncCallQueue; AsyncCall(int aDebugSection, int aDebugLevel, const char *aName); + AsyncCall(); + AsyncCall(const AsyncCall &); virtual ~AsyncCall(); void make(); // fire if we can; handles general call debugging @@ -122,6 +124,10 @@ const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName), dialer(aDialer) {} + AsyncCallT(const RefCount > &o): + AsyncCall(o->debugSection, o->debugLevel, o->name), + dialer(o->dialer) {} + CallDialer *getDialer() { return &dialer; } protected: === modified file 'src/client_side.cc' --- src/client_side.cc 2010-12-16 01:15:12 +0000 +++ src/client_side.cc 2011-01-10 00:46:31 +0000 @@ -92,8 +92,9 @@ #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" +#include "CommCalls.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" +#include "comm/TcpAcceptor.h" #include "base/TextException.h" #include "ConnectionDetail.h" #include "eui/Config.h" @@ -135,13 +136,13 @@ #define comm_close comm_lingering_close #endif -/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call +/// dials clientListenerConnectionOpened call class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb { public: - typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg); - ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg): - handler(aHandler), portCfg(aPortCfg) {} + typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg, bool uses_ssl); + ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, bool aSslFlag): + handler(aHandler), portCfg(aPortCfg), uses_ssl(aSslFlag) {} virtual void print(std::ostream &os) const { startPrint(os) << @@ -149,20 +150,17 @@ } virtual bool canDial(AsyncCall &) const { return true; } - virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); } + virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg, uses_ssl); } public: Handler handler; private: http_port_list *portCfg; ///< from Config.Sockaddr.http + bool uses_ssl; }; - -static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s); -#if USE_SSL -static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s); -#endif +static void clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl); /* our socket-related context */ @@ -3122,14 +3120,15 @@ /** Handle a new connection on HTTP socket. */ void -httpAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { http_port_list *s = (http_port_list *)data; ConnStateData *connState = NULL; if (flag != COMM_OK) { - debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + // This should not occur with TcpAcceptor. + // However its possible the call was still queued when the client disconnected + debugs(33, 1, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } @@ -3368,15 +3367,16 @@ /** handle a new HTTPS connection */ static void -httpsAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { https_port_list *s = (https_port_list *)data; SSL_CTX *sslContext = s->staticSslContext.get(); if (flag != COMM_OK) { + // This should not occur with TcpAcceptor. + // However its possible the call was still queued when the client disconnected errno = xerrno; - debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + debugs(33, 1, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } @@ -3625,13 +3625,16 @@ const int openFlags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0); - AsyncCall::Pointer callback = asyncCall(33,2, - "clientHttpConnectionOpened", - ListeningStartedDialer(&clientHttpConnectionOpened, s)); - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, - Ipc::fdnHttpSocket, callback); - - HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); + + AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, s, false)); + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall, sub); + + HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened } #if USE_SSL @@ -3644,27 +3647,24 @@ /// process clientHttpConnectionsOpen result static void -clientHttpConnectionOpened(int fd, int, http_port_list *s) +clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl) { - if (!OpenedHttpSocket(fd, "Cannot open HTTP Port")) + s->listenFd = fd; + if (!OpenedHttpSocket(s->listenFd, (uses_ssl?"Cannot open HTTPS Port":"Cannot open HTTP Port"))) return; Must(s); - - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)", - CommAcceptCbPtrFun(httpAccept, s)); - - s->listener = new Comm::ListenStateData(fd, call, true); - - debugs(1, 1, "Accepting " << + Must(s->listenFd >= 0); + + debugs(1, 1, "Accepting" << (s->intercepted ? " intercepted" : "") << (s->spoof_client_ip ? " spoofing" : "") << (s->sslBump ? " bumpy" : "") << (s->accel ? " accelerated" : "") - << " HTTP connections at " << s->s - << ", FD " << fd << "." ); + << " HTTP" << (uses_ssl?"S":"") << " connections at " + << " FD " << s->listenFd << " on " << s->s); - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for + Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for } #if USE_SSL @@ -3686,35 +3686,22 @@ continue; } - AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened", - ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http)); - - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING, - Ipc::fdnHttpsSocket, call); + const int openFlags = COMM_NONBLOCKING | + (s->spoof_client_ip ? COMM_TRANSPARENT : 0); + + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); + + AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, &s->http, true)); + + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall, sub); HttpSockets[NHttpSockets++] = -1; } } - -/// process clientHttpsConnectionsOpen result -static void -clientHttpsConnectionOpened(int fd, int, http_port_list *s) -{ - if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port")) - return; - - Must(s); - - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)", - CommAcceptCbPtrFun(httpsAccept, s)); - - s->listener = new Comm::ListenStateData(fd, call, true); - - debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << "."); - - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for -} - #endif void @@ -3733,19 +3720,19 @@ clientHttpConnectionsClose(void) { for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #if USE_SSL for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #endif === modified file 'src/comm.cc' --- src/comm.cc 2010-12-06 20:06:08 +0000 +++ src/comm.cc 2011-01-09 10:11:24 +0000 @@ -41,7 +41,7 @@ #include "comm/comm_internal.h" #include "comm/IoCallback.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" +#include "comm/TcpAcceptor.h" #include "CommIO.h" #include "CommRead.h" #include "ConnectionDetail.h" @@ -143,7 +143,7 @@ bool isOpen(const int fd) { - return fd_table[fd].flags.open != 0; + return fd >= 0 && fd_table[fd].flags.open != 0; } /** === modified file 'src/comm/AcceptLimiter.cc' --- src/comm/AcceptLimiter.cc 2009-12-31 02:35:01 +0000 +++ src/comm/AcceptLimiter.cc 2011-01-10 00:52:51 +0000 @@ -1,6 +1,6 @@ #include "config.h" #include "comm/AcceptLimiter.h" -#include "comm/ListenStateData.h" +#include "comm/TcpAcceptor.h" #include "fde.h" Comm::AcceptLimiter Comm::AcceptLimiter::Instance_; @@ -11,7 +11,7 @@ } void -Comm::AcceptLimiter::defer(Comm::ListenStateData *afd) +Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd) { afd->isLimited++; debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); @@ -19,14 +19,33 @@ } void +Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd) +{ + for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) { + if (deferred[i] == afd) { + deferred[i]->isLimited--; + deferred[i] = NULL; // fast. kick() will skip empty entries later. + debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); + } + } +} + +void Comm::AcceptLimiter::kick() { + // TODO: this could be optimized further with an iterator to search + // looking for first non-NULL, followed by dumping the first N + // with only one shift()/pop_front operation + debugs(5, 5, HERE << " size=" << deferred.size()); - if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { - debugs(5, 5, HERE << " doing one."); + while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */ - ListenStateData *temp = deferred.shift(); - temp->isLimited--; - temp->acceptNext(); + TcpAcceptor *temp = deferred.shift(); + if (temp != NULL) { + debugs(5, 5, HERE << " doing one."); + temp->isLimited--; + temp->acceptNext(); + break; + } } } === modified file 'src/comm/AcceptLimiter.h' --- src/comm/AcceptLimiter.h 2010-01-13 01:13:17 +0000 +++ src/comm/AcceptLimiter.h 2011-01-08 14:09:20 +0000 @@ -6,7 +6,7 @@ namespace Comm { -class ListenStateData; +class TcpAcceptor; /** * FIFO Queue holding listener socket handlers which have been activated @@ -25,7 +25,10 @@ static AcceptLimiter &Instance(); /** delay accepting a new client connection. */ - void defer(Comm::ListenStateData *afd); + void defer(Comm::TcpAcceptor *afd); + + /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */ + void removeDead(const Comm::TcpAcceptor *afd); /** try to accept and begin processing any delayed client connections. */ void kick(); @@ -34,7 +37,7 @@ static AcceptLimiter Instance_; /** FIFO queue */ - Vector deferred; + Vector deferred; }; }; // namepace Comm === modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2010-11-27 01:46:22 +0000 +++ src/comm/Makefile.am 2011-01-08 11:40:46 +0000 @@ -7,8 +7,8 @@ libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ - ListenStateData.cc \ - ListenStateData.h \ + TcpAcceptor.cc \ + TcpAcceptor.h \ \ IoCallback.cc \ IoCallback.h \ === renamed file 'src/comm/ListenStateData.cc' => 'src/comm/TcpAcceptor.cc' --- src/comm/ListenStateData.cc 2010-08-10 03:11:19 +0000 +++ src/comm/TcpAcceptor.cc 2011-01-09 09:51:14 +0000 @@ -33,15 +33,98 @@ */ #include "squid.h" +#include "base/TextException.h" #include "CommCalls.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" -#include "comm/ListenStateData.h" -#include "ConnectionDetail.h" +#include "comm/TcpAcceptor.h" #include "fde.h" #include "protos.h" #include "SquidTime.h" +namespace Comm { + CBDATA_CLASS_INIT(TcpAcceptor); +}; + +Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub) : + AsyncJob("Comm::TcpAcceptor"), + errcode(0), + isLimited(0), + theCallSub(aSub), + fd(listenFd), + local_addr(laddr), + newFd_(-1) +{ + /* open the conn if its not already open */ + if (fd < 0) { + fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, local_addr, flags, note); + errcode = errno; + + if (fd < 0) { + debugs(5, DBG_CRITICAL, HERE << "comm_open failed: FD " << fd << ", " << local_addr << " error: " << errcode); + return; + } + debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd << ", " << local_addr); + } +} + +void +Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) +{ + debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << aSub); + unsubscribe("subscription change"); + theCallSub = aSub; +} + +void +Comm::TcpAcceptor::unsubscribe(const char *reason) +{ + debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription " << theCallSub << " removed: " << reason); + theCallSub = NULL; +} + +void +Comm::TcpAcceptor::start() +{ + debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << theCallSub); + + Must(isOpen(fd)); + + setListen(); + + // if no error so far start accepting connections. + if (errcode == 0) + commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); +} + +bool +Comm::TcpAcceptor::doneAll() const +{ + // stop when FD is closed + if (!isOpen(fd)) { + return AsyncJob::doneAll(); + } + + // stop when handlers are gone + if (theCallSub == NULL) { + return AsyncJob::doneAll(); + } + + // open FD with handlers...keep accepting. + return false; +} + +void +Comm::TcpAcceptor::swanSong() +{ + debugs(5,5, HERE); + unsubscribe("swanSong"); + fd = -1; + AcceptLimiter::Instance().removeDead(this); + AsyncJob::swanSong(); +} + /** * New-style listen and accept routines * @@ -50,11 +133,11 @@ * accept()ed some time later. */ void -Comm::ListenStateData::setListen() +Comm::TcpAcceptor::setListen() { errcode = 0; // reset local errno copy. if (listen(fd, Squid_MaxFD >> 2) < 0) { - debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); + debugs(50, DBG_CRITICAL, "ERROR: listen(FD " << fd << ", " << local_addr << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); errcode = errno; return; } @@ -66,37 +149,19 @@ debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd); xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) - debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); #elif defined(TCP_DEFER_ACCEPT) int seconds = 30; if (strncmp(Config.accept_filter, "data=", 5) == 0) seconds = atoi(Config.accept_filter + 5); if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) - debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); #else - debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); + debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS"); #endif } } -Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) : - fd(aFd), - theCallback(call), - mayAcceptMore(accept_many) -{ - assert(aFd >= 0); - debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call); - assert(isOpen(aFd)); - setListen(); - commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); -} - -Comm::ListenStateData::~ListenStateData() -{ - comm_close(fd); - fd = -1; -} - /** * This private callback is called whenever a filedescriptor is ready * to dupe itself and fob off an accept()ed connection @@ -107,23 +172,30 @@ * done later when enough sockets become available. */ void -Comm::ListenStateData::doAccept(int fd, void *data) +Comm::TcpAcceptor::doAccept(int fd, void *data) { - debugs(5, 2, HERE << "New connection on FD " << fd); - - assert(isOpen(fd)); - ListenStateData *afd = static_cast(data); - - if (!okToAccept()) { - AcceptLimiter::Instance().defer(afd); - } else { - afd->acceptNext(); + try { + debugs(5, 2, HERE << "New connection on FD " << fd); + + Must(isOpen(fd)); + TcpAcceptor *afd = static_cast(data); + + if (!okToAccept()) { + AcceptLimiter::Instance().defer(afd); + } else { + afd->acceptNext(); + } + commSetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0); + + } catch(const TextException &e) { + fatalf("FATAL: error while accepting new client connection: %s\n", e.message); + } catch(...) { + fatal("FATAL: error while accepting new client connection: [unkown]\n"); } - commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0); } bool -Comm::ListenStateData::okToAccept() +Comm::TcpAcceptor::okToAccept() { static time_t last_warn = 0; @@ -139,7 +211,7 @@ } void -Comm::ListenStateData::acceptOne() +Comm::TcpAcceptor::acceptOne() { /* * We don't worry about running low on FDs here. Instead, @@ -148,42 +220,44 @@ */ /* Accept a new connection */ - ConnectionDetail connDetails; - int newfd = oldAccept(connDetails); + ConnectionDetail newConnDetails; + comm_err_t status = oldAccept(newConnDetails); /* Check for errors */ - if (newfd < 0) { + if (!isOpen(newFd_)) { - if (newfd == COMM_NOMESSAGE) { + if (status == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback); + debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub); commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); return; } // A non-recoverable error; notify the caller */ - debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback); - notify(-1, COMM_ERROR, connDetails); - mayAcceptMore = false; + debugs(5, 5, HERE << "non-recoverable error: FD " << fd << ", " << local_addr << " handler Subscription: " << theCallSub); + notify(status, newConnDetails); + mustStop("Listener socket closed"); return; } - debugs(5, 5, HERE << "accepted: FD " << fd << - " newfd: " << newfd << " from: " << connDetails.peer << - " handler: " << theCallback); - notify(newfd, COMM_OK, connDetails); + debugs(5, 5, HERE << "Listener: FD " << fd << + " accepted new connection from " << newConnDetails.peer << + " handler Subscription: " << theCallSub); + notify(status, newConnDetails); } void -Comm::ListenStateData::acceptNext() +Comm::TcpAcceptor::acceptNext() { - assert(isOpen(fd)); + Must(isOpen(fd)); debugs(5, 2, HERE << "connection on FD " << fd); acceptOne(); } +// XXX: obsolete comment? +// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback(). void -Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails) +Comm::TcpAcceptor::notify(comm_err_t flag, const ConnectionDetail &connDetails) { // listener socket handlers just abandon the port with COMM_ERR_CLOSING // it should only happen when this object is deleted... @@ -191,26 +265,33 @@ return; } - if (theCallback != NULL) { - typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(theCallback); + if (theCallSub != NULL) { + AsyncCall::Pointer call = theCallSub->callback(); + CommAcceptCbParams ¶ms = GetCommParams(call); params.fd = fd; - params.nfd = newfd; + params.nfd = newFd_; params.details = connDetails; params.flag = flag; params.xerrno = errcode; - ScheduleCallHere(theCallback); - if (!mayAcceptMore) - theCallback = NULL; + ScheduleCallHere(call); } + + // drop the temporary recent accepted socket FD details. + // this prevents information crossover on calls. + newFd_ = -1; } /** * accept() and process - * Wait for an incoming connection on FD. + * Wait for an incoming connection on our listener socket. + * + * \retval COMM_OK success. details parameter filled. + * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in. + * \retval COMM_ERROR an outright failure occured. + * Or if this client has too many connections already. */ -int -Comm::ListenStateData::oldAccept(ConnectionDetail &details) +comm_err_t +Comm::TcpAcceptor::oldAccept(ConnectionDetail &details) { PROF_start(comm_accept); statCounter.syscalls.sock.accepts++; @@ -238,6 +319,8 @@ } } + Must(sock >= 0); + newFd_ = sock; details.peer = *gai; if ( Config.client_ip_max_connections >= 0) { @@ -248,15 +331,16 @@ } } + // lookup the local-end details of this new connection details.me.InitAddrInfo(gai); - details.me.SetEmpty(); getsockname(sock, gai->ai_addr, &gai->ai_addrlen); details.me = *gai; - - commSetCloseOnExec(sock); + details.me.FreeAddrInfo(gai); /* fdstat update */ + // XXX : these are not all HTTP requests. use a note about type and ip:port details-> + // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port" fd_open(sock, FD_SOCKET, "HTTP Request"); fdd_table[sock].close_file = NULL; @@ -265,15 +349,16 @@ fde *F = &fd_table[sock]; details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); F->remote_port = details.peer.GetPort(); - F->local_addr.SetPort(details.me.GetPort()); + F->local_addr = details.me; F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET; - details.me.FreeAddrInfo(gai); + // set socket flags + commSetCloseOnExec(sock); commSetNonBlocking(sock); /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */ F->flags.transparent = fd_table[fd].flags.transparent; PROF_stop(comm_accept); - return sock; + return COMM_OK; } === renamed file 'src/comm/ListenStateData.h' => 'src/comm/TcpAcceptor.h' --- src/comm/ListenStateData.h 2010-11-27 01:58:38 +0000 +++ src/comm/TcpAcceptor.h 2011-01-09 00:24:09 +0000 @@ -1,39 +1,89 @@ -#ifndef SQUID_LISTENERSTATEDATA_H -#define SQUID_LISTENERSTATEDATA_H +#ifndef SQUID_COMM_TCPACCEPTOR_H +#define SQUID_COMM_TCPACCEPTOR_H #include "base/AsyncCall.h" -#include "comm.h" +#include "base/Subscription.h" +#include "CommCalls.h" +#include "comm_err_t.h" +#include "comm/TcpAcceptor.h" +#include "ip/Address.h" + #if HAVE_MAP #include #endif -class ConnectionDetail; - namespace Comm { -class ListenStateData +class AcceptLimiter; + +/** + * Listens on an FD for new incoming connections and + * emits an active FD descriptor for the new client. + * + * Handles all event limiting required to quash inbound connection + * floods within the global FD limits of available Squid_MaxFD and + * client_ip_max_connections. + * + * Fills the emitted connection with all connection details able to + * be looked up. Currently these are the local/remote IP:port details + * and the listening socket transparent-mode flag. + */ +class TcpAcceptor : public AsyncJob { +private: + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); public: - ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); - ListenStateData(const ListenStateData &r); // not implemented. - ~ListenStateData(); - - void subscribe(AsyncCall::Pointer &call); + TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub); + + TcpAcceptor(const TcpAcceptor &r); // not implemented. + + /** Subscribe a handler to receive calls back about new connections. + * Replaces any existing subscribed handler. + */ + void subscribe(const Subscription::Pointer &aSub); + + /** Remove the currently waiting callback subscription. + * Pending calls will remain scheduled. + */ + void unsubscribe(const char *reason); + + /** Try and accept another connection (synchronous). + * If one is pending already the subscribed callback handler will be scheduled + * to handle it before this method returns. + */ void acceptNext(); - void notify(int newfd, comm_err_t flag, const ConnectionDetail &details); - int fd; + /// Call the subscribed callback handler with details about a new connection. + void notify(comm_err_t flags, const ConnectionDetail &newConnDetails); /// errno code of the last accept() or listen() action if one occurred. int errcode; - /// whether this socket is delayed and on the AcceptLimiter queue. - int32_t isLimited; - -private: - /// Method to test if there are enough file escriptors to open a new client connection +private: + friend class AcceptLimiter; + int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. + Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. + +public: + /// conn being listened on for new connections + /// Reserved for read-only use. + // NP: public only until we can hide it behind connection handles + int fd; + +private: + /// IP Address and port being listened on + Ip::Address local_addr; + + /// temporary holder for newely accepted client FD + int newFd_; + +private: + /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed static bool okToAccept(); @@ -41,14 +91,12 @@ static void doAccept(int fd, void *data); void acceptOne(); - int oldAccept(ConnectionDetail &details); - - AsyncCall::Pointer theCallback; - bool mayAcceptMore; - + comm_err_t oldAccept(ConnectionDetail &newConnDetails); void setListen(); + + CBDATA_CLASS2(TcpAcceptor); }; } // namespace Comm -#endif /* SQUID_LISTENERSTATEDATA_H */ +#endif /* SQUID_COMM_TCPACCEPTOR_H */ === modified file 'src/ftp.cc' --- src/ftp.cc 2010-12-13 11:31:14 +0000 +++ src/ftp.cc 2011-01-10 00:42:23 +0000 @@ -34,8 +34,9 @@ #include "squid.h" #include "comm.h" +#include "CommCalls.h" +#include "comm/TcpAcceptor.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" #include "compat/strtoll.h" #include "ConnectionDetail.h" #include "errorpage.h" @@ -153,13 +154,11 @@ void clear(); /// just resets fd and close handler. does not close active connections. - int fd; /// channel descriptor; \todo: remove because the closer has it - - /** Current listening socket handler. delete on shutdown or abort. - * FTP stores a copy of the FD in the field fd above. - * Use close() to properly close the channel. - */ - Comm::ListenStateData *listener; + int fd; /// channel descriptor + + Ip::Address local; ///< The local IP address:port this channel is using + + int flags; ///< socket flags used when opening. private: AsyncCall::Pointer closer; /// Comm close handler callback @@ -245,6 +244,12 @@ void completedListing(void); void dataComplete(); void dataRead(const CommIoCbParams &io); + + /// ignore timeout on CTRL channel. set read timeout on DATA channel. + void switchTimeoutToDataChannel(); + /// create a data channel acceptor and start listening. + void listenForDataChannel(const int fd, const char *note); + int checkAuth(const HttpHeader * req_hdr); void checkUrlpath(); void buildTitleUrl(); @@ -451,10 +456,10 @@ void FtpStateData::dataClosed(const CommCloseCbParams &io) { - if (data.listener) { - delete data.listener; - data.listener = NULL; - data.fd = -1; + debugs(9, 4, HERE); + if (data.fd >= 0) { + comm_close(data.fd); + // NP clear() does the: data.fd = -1; } data.clear(); failed(ERR_FTP_FAILURE, 0); @@ -606,6 +611,28 @@ } void +FtpStateData::switchTimeoutToDataChannel() +{ + commSetTimeout(ctrl.fd, -1, NULL, NULL); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout); + commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); +} + +void +FtpStateData::listenForDataChannel(const int fd, const char *note) +{ + typedef CommCbMemFunT AcceptDialer; + typedef AsyncCallT AcceptCall; + RefCount call = static_cast(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection)); + Subscription::Pointer sub = new CallSubscription(call); + Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(fd, data.local, data.flags, note, sub); + data.fd = tmp->fd; // Ensure we have a copy of the FD opened for listening. + AsyncJob::Start(tmp); +} + +void FtpStateData::ftpTimeout(const CommTimeoutCbParams &io) { debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" ); @@ -1066,10 +1093,16 @@ usable = end - sbuf; - debugs(9, 3, HERE << "usable = " << usable); + debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes."); if (usable == 0) { - debugs(9, 3, HERE << "didn't find end for " << entry->url() ); + if (buf[0] == '\0' && len == 1) { + debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?"); + data.readBuf->consume(len); + } else { + debugs(9, 3, HERE << "didn't find end for " << entry->url()); + debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << buf << "'"); + } xfree(sbuf); return; } @@ -1673,7 +1706,7 @@ * establish one on the control socket. */ - if (data.fd > -1) { + if (data.fd >= 0) { AsyncCall::Pointer nullCall = NULL; commSetTimeout(data.fd, -1, nullCall); } @@ -2718,27 +2751,21 @@ static int ftpOpenListenSocket(FtpStateData * ftpState, int fallback) { - int fd; - Ip::Address addr; struct addrinfo *AI = NULL; - int on = 1; int x = 0; /// Close old data channels, if any. We may open a new one below. - ftpState->data.close(); + if (!(ftpState->data.flags & COMM_REUSEADDR)) + ftpState->data.close(); /* * Set up a listen socket on the same local address as the * control connection. */ - - addr.InitAddrInfo(AI); - + ftpState->data.local.InitAddrInfo(AI); x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen); - - addr = *AI; - - addr.FreeAddrInfo(AI); + ftpState->data.local = *AI; + ftpState->data.local.FreeAddrInfo(AI); if (x) { debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror()); @@ -2750,38 +2777,19 @@ * used for both control and data. */ if (fallback) { + int on = 1; setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + ftpState->ctrl.flags |= COMM_REUSEADDR; + ftpState->data.flags |= COMM_REUSEADDR; + ftpState->data.fd = ftpState->ctrl.fd; } else { /* if not running in fallback mode a new port needs to be retrieved */ - addr.SetPort(0); - } - - fd = comm_open(SOCK_STREAM, - IPPROTO_TCP, - addr, - COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0), - ftpState->entry->url()); - debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd ); - - if (fd < 0) { - debugs(9, DBG_CRITICAL, HERE << "comm_open failed"); - return -1; - } - - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false); - - if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) { - comm_close(fd); - return -1; - } - - ftpState->data.opened(fd, ftpState->dataCloser()); - ftpState->data.port = comm_local_port(fd); - ftpState->data.host = NULL; - return fd; + ftpState->data.local.SetPort(0); + ftpState->data.flags = COMM_NONBLOCKING; + } + + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->entry->url()); + return ftpState->data.fd; } /// \ingroup ServerProtocolFTPInternal @@ -2870,6 +2878,7 @@ debugs(9, 3, HERE); ftpState->flags.pasv_supported = 0; fd = ftpOpenListenSocket(ftpState, 0); + debugs(9, 3, "Listening for FTP data connection with FD " << fd); Ip::Address::InitAddrInfo(AI); @@ -2922,77 +2931,68 @@ */ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) { - char ntoapeer[MAX_IPSTRLEN]; - debugs(9, 3, "ftpAcceptDataConnection"); - - // one connection accepted. the handler has stopped listening. drop our local pointer to it. - data.listener = NULL; + debugs(9, 3, HERE); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("entry aborted when accepting data conn"); return; } + if (io.flag != COMM_OK) { + data.close(); + debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno)); + /** \todo Need to send error message on control channel*/ + ftpFail(this); + return; + } + + /* data listening conn is no longer even open. abort. */ + if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) { + data.clear(); // ensure that it's cleared and not just closed. + return; + } + /** \par * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being * made by the remote client which is connected to the FTP control socket. + * Or the one which we were told to listen for by control channel messages (may differ under NAT). * This prevents third-party hacks, but also third-party load balancing handshakes. */ if (Config.Ftp.sanitycheck) { + char ntoapeer[MAX_IPSTRLEN]; io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN); - if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) { + if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 && + strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) { debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" << io.details.peer << "), expecting " << - fd_table[ctrl.fd].ipaddr); + fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr); - /* close the bad soures connection down ASAP. */ + /* close the bad sources connection down ASAP. */ comm_close(io.nfd); - /* we are ony accepting once, so need to re-open the listener socket. */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + /* drop the bad connection (io) by ignoring the attempt. */ return; } } - if (io.flag != COMM_OK) { - debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno)); - /** \todo XXX Need to set error message */ - ftpFail(this); - return; - } - /**\par - * Replace the Listen socket with the accepted data socket */ + * Replace the Listening socket with the accepted data socket */ data.close(); data.opened(io.nfd, dataCloser()); data.port = io.details.peer.GetPort(); - io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN); + data.host = xstrdup(fd_table[io.nfd].ipaddr); debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " << "FD " << io.nfd << " to " << io.details.peer << " FD table says: " << "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " << "data-peer= " << fd_table[data.fd].ipaddr); - - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, this, FtpStateData::ftpTimeout); - commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); - - /*\todo XXX We should have a flag to track connect state... - * host NULL -> not connected, port == local port - * host set -> connected, port == remote port - */ - /* Restart state (SENT_NLST/LIST/RETR) */ - FTP_SM_FUNCS[state] (this); + assert(haveControlChannel("ftpAcceptDataConnection")); + assert(ctrl.message == NULL); + + // Ctrl channel operations will determine what happens to this data connection } /// \ingroup ServerProtocolFTPInternal @@ -3087,11 +3087,7 @@ /*\par * When client code is 150 with a hostname, Accept data channel. */ debugs(9, 3, "ftpReadStor: accepting data channel"); - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + listenForDataChannel(data.fd, data.host); } else { debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code); ftpFail(this); @@ -3211,34 +3207,16 @@ if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ - /* XXX what about Config.Timeout.read? */ + debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); return; } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->switchTimeoutToDataChannel(); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); return; } else if (!ftpState->flags.tried_nlst && code > 300) { ftpSendNlst(ftpState); @@ -3274,32 +3252,13 @@ if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ debugs(9, 3, HERE << "reading data channel"); - /* XXX what about Config.Timeout.read? */ + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + ftpState->switchTimeoutToDataChannel(); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); } else if (code >= 300) { if (!ftpState->flags.try_slash_hack) { /* Try this as a directory missing trailing slash... */ @@ -3952,6 +3911,13 @@ fd = aFd; closer = aCloser; comm_add_close_handler(fd, closer); + + // grab the local IP address:port details for this connection + struct addrinfo *AI = NULL; + local.InitAddrInfo(AI); + getsockname(aFd, AI->ai_addr, &AI->ai_addrlen); + local = *AI; + local.FreeAddrInfo(AI); } /// planned close: removes the close handler and calls comm_close @@ -3959,15 +3925,11 @@ FtpChannel::close() { // channels with active listeners will be closed when the listener handler dies. - if (listener) { - delete listener; - listener = NULL; - comm_remove_close_handler(fd, closer); - closer = NULL; - fd = -1; - } else if (fd >= 0) { - comm_remove_close_handler(fd, closer); - closer = NULL; + if (fd >= 0) { + if (closer != NULL) { + comm_remove_close_handler(fd, closer); + closer = NULL; + } comm_close(fd); // we do not expect to be called back fd = -1; } === modified file 'src/htcp.cc' --- src/htcp.cc 2010-12-13 11:31:14 +0000 +++ src/htcp.cc 2011-01-09 08:28:49 +0000 @@ -1516,12 +1516,13 @@ AsyncCall::Pointer call = asyncCall(31, 2, "htcpIncomingConnectionOpened", HtcpListeningStartedDialer(&htcpIncomingConnectionOpened)); + Subscription::Pointer nilSub; Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, incomingAddr, COMM_NONBLOCKING, - Ipc::fdnInHtcpSocket, call); + Ipc::fdnInHtcpSocket, call, nilSub); if (!Config.Addrs.udp_outgoing.IsNoAddr()) { Ip::Address outgoingAddr = Config.Addrs.udp_outgoing; === modified file 'src/icp_v2.cc' --- src/icp_v2.cc 2010-12-13 11:31:14 +0000 +++ src/icp_v2.cc 2011-01-09 08:30:19 +0000 @@ -698,11 +698,13 @@ "icpIncomingConnectionOpened", IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr)); + Subscription::Pointer nilSub; + Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, addr, COMM_NONBLOCKING, - Ipc::fdnInIcpSocket, call); + Ipc::fdnInIcpSocket, call, nilSub); addr.SetEmpty(); // clear for next use. addr = Config.Addrs.udp_outgoing; === modified file 'src/ipc/SharedListen.cc' --- src/ipc/SharedListen.cc 2010-10-28 18:52:59 +0000 +++ src/ipc/SharedListen.cc 2011-01-09 00:21:05 +0000 @@ -150,5 +150,6 @@ Must(cbd); cbd->fd = fd; cbd->errNo = response.errNo; + cbd->handlerSubscription = por.params.handlerSubscription; ScheduleCallHere(por.callback); } === modified file 'src/ipc/SharedListen.h' --- src/ipc/SharedListen.h 2010-07-06 23:09:44 +0000 +++ src/ipc/SharedListen.h 2011-01-09 00:18:50 +0000 @@ -9,6 +9,7 @@ #define SQUID_IPC_SHARED_LISTEN_H #include "base/AsyncCall.h" +#include "base/Subscription.h" namespace Ipc { @@ -28,6 +29,9 @@ Ip::Address addr; ///< will be memset and memcopied int flags; int fdNote; ///< index into fd_note() comment strings + + /// handler to subscribe to Comm::TcpAcceptor + Subscription::Pointer handlerSubscription; }; class TypedMsgHdr; === modified file 'src/ipc/StartListening.cc' --- src/ipc/StartListening.cc 2010-07-06 23:09:44 +0000 +++ src/ipc/StartListening.cc 2011-01-10 00:52:21 +0000 @@ -6,8 +6,9 @@ */ #include "config.h" +#include "base/TextException.h" #include "comm.h" -#include "base/TextException.h" +#include "comm/TcpAcceptor.h" #include "ipc/SharedListen.h" #include "ipc/StartListening.h" @@ -25,34 +26,41 @@ return os << "(FD " << fd << ", err=" << errNo; } - -void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, - int flags, FdNoteId fdNote, AsyncCall::Pointer &callback) +void +Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, int flags, + FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub) { - OpenListenerParams p; - p.sock_type = sock_type; - p.proto = proto; - p.addr = addr; - p.flags = flags; - p.fdNote = fdNote; - - if (UsingSmp()) { // if SMP is on, share + if (UsingSmp()) { // if SMP is on, share + OpenListenerParams p; + p.sock_type = sock_type; + p.proto = proto; + p.addr = addr; + p.flags = flags; + p.fdNote = fdNote; + p.handlerSubscription = sub; Ipc::JoinSharedListen(p, callback); return; // wait for the call back } + StartListeningCb *cbd = dynamic_cast(callback->getDialer()); + Must(cbd); + enter_suid(); - const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags, - FdNote(p.fdNote)); - const int errNo = (sock >= 0) ? 0 : errno; + if (sock_type == SOCK_STREAM) { + // TCP: setup a job to handle accept() with subscribed handler + Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(cbd->fd, addr, flags, FdNote(fdNote), sub); + cbd->fd = tmp->fd; + AsyncJob::Start(tmp); + } else if (sock_type == SOCK_DGRAM) { + // UDP: setup the listener socket, but do not set a subscriber + // TODO: create a UDP sbscription so packet event calls get scheduled and queued Async. + cbd->fd = comm_open_listener(sock_type, proto, addr, flags, FdNote(fdNote)); + } else { + fatalf("Invalid Socket Type (%d)",sock_type); + } + cbd->errNo = cbd->fd >= 0 ? 0 : errno; leave_suid(); - debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr); - - StartListeningCb *cbd = - dynamic_cast(callback->getDialer()); - Must(cbd); - cbd->fd = sock; - cbd->errNo = errNo; + debugs(54, 3, HERE << "opened listen FD " << cbd->fd << " on " << addr); ScheduleCallHere(callback); } === modified file 'src/ipc/StartListening.h' --- src/ipc/StartListening.h 2010-11-21 04:40:05 +0000 +++ src/ipc/StartListening.h 2011-01-09 00:30:35 +0000 @@ -11,6 +11,7 @@ #include "ip/forward.h" #include "ipc/FdNotes.h" #include "base/AsyncCall.h" +#include "base/Subscription.h" #if HAVE_IOSFWD #include @@ -32,12 +33,15 @@ public: int fd; ///< opened listening socket or -1 int errNo; ///< errno value from the comm_open_listener() call + + /// The subscription we will pass on to the Comm::TcpAcceptor + Subscription::Pointer handlerSubscription; }; /// Depending on whether SMP is on, either ask Coordinator to send us /// the listening FD or call comm_open_listener() directly. -extern void StartListening(int sock_type, int proto, Ip::Address &addr, - int flags, FdNoteId fdNote, AsyncCall::Pointer &callback); +extern void StartListening(int sock_type, int proto, Ip::Address &addr, int flags, + FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub); } // namespace Ipc; === modified file 'src/snmp_core.cc' --- src/snmp_core.cc 2010-12-13 11:31:14 +0000 +++ src/snmp_core.cc 2011-01-09 08:32:43 +0000 @@ -318,12 +318,13 @@ AsyncCall::Pointer call = asyncCall(49, 2, "snmpIncomingConnectionOpened", SnmpListeningStartedDialer(&snmpIncomingConnectionOpened)); + Subscription::Pointer nilSub; Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, Config.Addrs.snmp_incoming, COMM_NONBLOCKING, - Ipc::fdnInSnmpSocket, call); + Ipc::fdnInSnmpSocket, call, nilSub); if (!Config.Addrs.snmp_outgoing.IsNoAddr()) { Config.Addrs.snmp_outgoing.SetPort(Config.Port.snmp); @@ -339,12 +340,13 @@ AsyncCall::Pointer call = asyncCall(49, 2, "snmpOutgoingConnectionOpened", SnmpListeningStartedDialer(&snmpOutgoingConnectionOpened)); + Subscription::Pointer nilSub; Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, Config.Addrs.snmp_outgoing, COMM_NONBLOCKING, - Ipc::fdnOutSnmpSocket, call); + Ipc::fdnOutSnmpSocket, call, nilSub); } } }