=== modified file 'src/comm/AcceptLimiter.cc' --- src/comm/AcceptLimiter.cc 2009-12-31 02:35:01 +0000 +++ src/comm/AcceptLimiter.cc 2010-09-18 02:09:49 +0000 @@ -1,6 +1,7 @@ #include "config.h" #include "comm/AcceptLimiter.h" -#include "comm/ListenStateData.h" +#include "comm/ConnAcceptor.h" +#include "comm/Connection.h" #include "fde.h" Comm::AcceptLimiter Comm::AcceptLimiter::Instance_; @@ -11,22 +12,41 @@ } void -Comm::AcceptLimiter::defer(Comm::ListenStateData *afd) +Comm::AcceptLimiter::defer(Comm::ConnAcceptor *afd) { afd->isLimited++; - debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); + debugs(5, 5, HERE << afd->conn << " x" << afd->isLimited); deferred.push_back(afd); } void +Comm::AcceptLimiter::removeDead(const Comm::ConnAcceptor *afd) +{ + for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) { + if (deferred[i] == afd) { + deferred[i]->isLimited--; + deferred[i] = NULL; // fast. kick() will skip empty entries later. + debugs(5, 5, HERE << afd->conn << " x" << afd->isLimited); + } + } +} + +void Comm::AcceptLimiter::kick() { + // TODO: this could be optimized further with an iterator to search + // looking for first non-NULL, followed by dumping the first N + // with only one shift()/pop_ftron operation + debugs(5, 5, HERE << " size=" << deferred.size()); - if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { - debugs(5, 5, HERE << " doing one."); + while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */ - ListenStateData *temp = deferred.shift(); - temp->isLimited--; - temp->acceptNext(); + ConnAcceptor *temp = deferred.shift(); + if (temp != NULL) { + debugs(5, 5, HERE << " doing one."); + temp->isLimited--; + temp->acceptNext(); + break; + } } } === modified file 'src/comm/AcceptLimiter.h' --- src/comm/AcceptLimiter.h 2010-01-13 01:13:17 +0000 +++ src/comm/AcceptLimiter.h 2010-09-18 02:11:31 +0000 @@ -1,12 +1,12 @@ -#ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H -#define _SQUID_SRC_COMM_ACCEPT_LIMITER_H +#ifndef _SQUID_SRC_COMM_ACCEPTLIMITER_H +#define _SQUID_SRC_COMM_ACCEPTLIMITER_H #include "Array.h" namespace Comm { -class ListenStateData; +class ConnAcceptor; /** * FIFO Queue holding listener socket handlers which have been activated @@ -14,7 +14,8 @@ * But when doing so there were not enough FD available to handle the * new connection. These handlers are awaiting some FD to become free. * - * defer - used only by Comm layer ListenStateData adding themselves when FD are limited. + * defer - used only by Comm layer ConnAcceptor adding themselves when FD are limited. + * removeDead - used only by Comm layer ConnAcceptor to remove themselves when dying. * kick - used by Comm layer when FD are closed. */ class AcceptLimiter @@ -25,7 +26,10 @@ static AcceptLimiter &Instance(); /** delay accepting a new client connection. */ - void defer(Comm::ListenStateData *afd); + void defer(Comm::ConnAcceptor *afd); + + /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */ + void removeDead(const Comm::ConnAcceptor *afd); /** try to accept and begin processing any delayed client connections. */ void kick(); @@ -34,9 +38,9 @@ static AcceptLimiter Instance_; /** FIFO queue */ - Vector deferred; + Vector deferred; }; }; // namepace Comm -#endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */ +#endif /* _SQUID_SRC_COMM_ACCEPTLIMITER_H */ === renamed file 'src/comm/ListenStateData.cc' => 'src/comm/ConnAcceptor.cc' --- src/comm/ListenStateData.cc 2010-08-10 03:11:19 +0000 +++ src/comm/ConnAcceptor.cc 2010-09-18 01:56:02 +0000 @@ -33,15 +33,100 @@ */ #include "squid.h" +#include "base/TextException.h" #include "CommCalls.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" -#include "comm/ListenStateData.h" -#include "ConnectionDetail.h" +#include "comm/Connection.h" +#include "comm/ConnAcceptor.h" #include "fde.h" #include "protos.h" #include "SquidTime.h" +namespace Comm { + CBDATA_CLASS_INIT(ConnAcceptor); +}; + +Comm::ConnAcceptor::ConnAcceptor(const Comm::ConnectionPointer &newConn, const char *note, const Subscription::Pointer &aSub) : + AsyncJob("Comm::ConnAcceptor"), + errcode(0), + isLimited(0), + theCallSub(aSub), + conn(newConn) +{ + assert(newConn != NULL); + + /* open the conn if its not already open */ + if (!IsConnOpen(conn)) { + conn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note); + errcode = errno; + + if (!conn->isOpen()) { + debugs(5, DBG_CRITICAL, HERE << "comm_open failed: " << conn << " error: " << errcode); + conn = NULL; + return; + } + debugs(9, 3, HERE << "Unconnected data socket created on " << conn); + } + assert(IsConnOpen(newConn)); +} + +void +Comm::ConnAcceptor::subscribe(const Subscription::Pointer &aSub) +{ + debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << aSub); + unsubscribe("subscription change"); + theCallSub = aSub; +} + +void +Comm::ConnAcceptor::unsubscribe(const char *reason) +{ + debugs(5, 5, HERE << conn << " AsyncCall Subscription " << theCallSub << " removed: " << reason); + theCallSub = NULL; +} + +void +Comm::ConnAcceptor::start() +{ + debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << theCallSub); + + Must(IsConnOpen(conn)); + + setListen(); + + // if no error so far start accepting connections. + if (errcode == 0) + commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0); +} + +bool +Comm::ConnAcceptor::doneAll() const +{ + // stop when FD is closed + if (!IsConnOpen(conn)) { + return AsyncJob::doneAll(); + } + + // stop when handlers are gone + if (theCallSub == NULL) { + return AsyncJob::doneAll(); + } + + // open FD with handlers...keep accepting. + return false; +} + +void +Comm::ConnAcceptor::swanSong() +{ + debugs(5,5, HERE); + unsubscribe("swanSong"); + conn = NULL; + AcceptLimiter::Instance().removeDead(this); + AsyncJob::swanSong(); +} + /** * New-style listen and accept routines * @@ -50,11 +135,11 @@ * accept()ed some time later. */ void -Comm::ListenStateData::setListen() +Comm::ConnAcceptor::setListen() { errcode = 0; // reset local errno copy. - if (listen(fd, Squid_MaxFD >> 2) < 0) { - debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); + if (listen(conn->fd, Squid_MaxFD >> 2) < 0) { + debugs(50, DBG_CRITICAL, "ERROR: listen(" << conn << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); errcode = errno; return; } @@ -63,40 +148,22 @@ #ifdef SO_ACCEPTFILTER struct accept_filter_arg afa; bzero(&afa, sizeof(afa)); - debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd); + debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on " << conn); xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); - if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) - debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); + if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) + debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); #elif defined(TCP_DEFER_ACCEPT) int seconds = 30; if (strncmp(Config.accept_filter, "data=", 5) == 0) seconds = atoi(Config.accept_filter + 5); - if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) - debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); + if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) + debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); #else - debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); + debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS"); #endif } } -Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) : - fd(aFd), - theCallback(call), - mayAcceptMore(accept_many) -{ - assert(aFd >= 0); - debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call); - assert(isOpen(aFd)); - setListen(); - commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); -} - -Comm::ListenStateData::~ListenStateData() -{ - comm_close(fd); - fd = -1; -} - /** * This private callback is called whenever a filedescriptor is ready * to dupe itself and fob off an accept()ed connection @@ -107,23 +174,30 @@ * done later when enough sockets become available. */ void -Comm::ListenStateData::doAccept(int fd, void *data) +Comm::ConnAcceptor::doAccept(int fd, void *data) { - debugs(5, 2, HERE << "New connection on FD " << fd); - - assert(isOpen(fd)); - ListenStateData *afd = static_cast(data); - - if (!okToAccept()) { - AcceptLimiter::Instance().defer(afd); - } else { - afd->acceptNext(); + try { + debugs(5, 2, HERE << "New connection on FD " << fd); + + Must(isOpen(fd)); + ConnAcceptor *afd = static_cast(data); + + if (!okToAccept()) { + AcceptLimiter::Instance().defer(afd); + } else { + afd->acceptNext(); + } + commSetSelect(fd, COMM_SELECT_READ, Comm::ConnAcceptor::doAccept, afd, 0); + + } catch(TextException &e) { + fatalf("FATAL: error while accepting new client connection: %s\n", e.message); + } catch(...) { + fatal("FATAL: error while accepting new client connection: [unkown]\n"); } - commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0); } bool -Comm::ListenStateData::okToAccept() +Comm::ConnAcceptor::okToAccept() { static time_t last_warn = 0; @@ -139,7 +213,7 @@ } void -Comm::ListenStateData::acceptOne() +Comm::ConnAcceptor::acceptOne() { /* * We don't worry about running low on FDs here. Instead, @@ -148,42 +222,43 @@ */ /* Accept a new connection */ - ConnectionDetail connDetails; - int newfd = oldAccept(connDetails); + ConnectionPointer newConnDetails = new Connection(); + comm_err_t status = oldAccept(newConnDetails); /* Check for errors */ - if (newfd < 0) { + if (!newConnDetails->isOpen()) { - if (newfd == COMM_NOMESSAGE) { + if (status == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback); - commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); + debugs(5, 5, HERE << "try later: " << conn << " handler Subscription: " << theCallSub); + commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0); return; } // A non-recoverable error; notify the caller */ - debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback); - notify(-1, COMM_ERROR, connDetails); - mayAcceptMore = false; + debugs(5, 5, HERE << "non-recoverable error: " << conn << " handler Subscription: " << theCallSub); + notify(status, newConnDetails); + mustStop("Listener socket closed"); return; } - debugs(5, 5, HERE << "accepted: FD " << fd << - " newfd: " << newfd << " from: " << connDetails.peer << - " handler: " << theCallback); - notify(newfd, COMM_OK, connDetails); + debugs(5, 5, HERE << "Listener: " << conn << + " accepted new connection " << newConnDetails << + " handler Subscription: " << theCallSub); + notify(status, newConnDetails); } void -Comm::ListenStateData::acceptNext() +Comm::ConnAcceptor::acceptNext() { - assert(isOpen(fd)); - debugs(5, 2, HERE << "connection on FD " << fd); + Must(IsConnOpen(conn)); + debugs(5, 2, HERE << "connection on " << conn); acceptOne(); } +// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback(). void -Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails) +Comm::ConnAcceptor::notify(comm_err_t flag, const Comm::ConnectionPointer &newConnDetails) { // listener socket handlers just abandon the port with COMM_ERR_CLOSING // it should only happen when this object is deleted... @@ -191,89 +266,95 @@ return; } - if (theCallback != NULL) { - typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(theCallback); - params.fd = fd; - params.nfd = newfd; - params.details = connDetails; + if (theCallSub != NULL) { + AsyncCall::Pointer call = theCallSub->callback(); + CommAcceptCbParams ¶ms = GetCommParams(call); + params.fd = conn->fd; + params.conn = newConnDetails; params.flag = flag; params.xerrno = errcode; - ScheduleCallHere(theCallback); - if (!mayAcceptMore) - theCallback = NULL; + ScheduleCallHere(call); } } /** * accept() and process - * Wait for an incoming connection on FD. + * Wait for an incoming connection on our listener socket. + * + * \retval COMM_OK success. details parameter filled. + * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in. + * \retval COMM_ERROR an outright failure occured. + * Or if this client has too many connections already. */ -int -Comm::ListenStateData::oldAccept(ConnectionDetail &details) +comm_err_t +Comm::ConnAcceptor::oldAccept(Comm::ConnectionPointer &details) { PROF_start(comm_accept); statCounter.syscalls.sock.accepts++; int sock; struct addrinfo *gai = NULL; - details.me.InitAddrInfo(gai); + details->local.InitAddrInfo(gai); errcode = 0; // reset local errno copy. - if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) { + if ((sock = accept(conn->fd, gai->ai_addr, &gai->ai_addrlen)) < 0) { errcode = errno; // store last accept errno locally. - details.me.FreeAddrInfo(gai); + details->local.FreeAddrInfo(gai); PROF_stop(comm_accept); if (ignoreErrno(errno)) { - debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 5, HERE << conn << ": " << xstrerror()); return COMM_NOMESSAGE; } else if (ENFILE == errno || EMFILE == errno) { - debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 3, HERE << conn << ": " << xstrerror()); return COMM_ERROR; } else { - debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 1, HERE << conn << ": " << xstrerror()); return COMM_ERROR; } } - details.peer = *gai; + Must(sock >= 0); + details->fd = sock; + details->remote = *gai; if ( Config.client_ip_max_connections >= 0) { - if (clientdbEstablished(details.peer, 0) > Config.client_ip_max_connections) { - debugs(50, DBG_IMPORTANT, "WARNING: " << details.peer << " attempting more than " << Config.client_ip_max_connections << " connections."); - details.me.FreeAddrInfo(gai); + if (clientdbEstablished(details->remote, 0) > Config.client_ip_max_connections) { + debugs(50, DBG_IMPORTANT, "WARNING: " << details->remote << " attempting more than " << Config.client_ip_max_connections << " connections."); + details->local.FreeAddrInfo(gai); return COMM_ERROR; } } - details.me.InitAddrInfo(gai); - - details.me.SetEmpty(); + // lookup the local-end details of this new connection + details->local.InitAddrInfo(gai); + details->local.SetEmpty(); getsockname(sock, gai->ai_addr, &gai->ai_addrlen); - details.me = *gai; - - commSetCloseOnExec(sock); + details->local = *gai; + details->local.FreeAddrInfo(gai); /* fdstat update */ + // XXX : these are not all HTTP requests. use a note about type and ip:port details-> + // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port" fd_open(sock, FD_SOCKET, "HTTP Request"); fdd_table[sock].close_file = NULL; fdd_table[sock].close_line = 0; fde *F = &fd_table[sock]; - details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); - F->remote_port = details.peer.GetPort(); - F->local_addr.SetPort(details.me.GetPort()); - F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET; - details.me.FreeAddrInfo(gai); + details->remote.NtoA(F->ipaddr,MAX_IPSTRLEN); + F->remote_port = details->remote.GetPort(); + F->local_addr = details->local; + F->sock_family = details->local.IsIPv6()?AF_INET6:AF_INET; + // set socket flags + commSetCloseOnExec(sock); commSetNonBlocking(sock); /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */ - F->flags.transparent = fd_table[fd].flags.transparent; + F->flags.transparent = fd_table[conn->fd].flags.transparent; PROF_stop(comm_accept); - return sock; + return COMM_OK; } === renamed file 'src/comm/ListenStateData.h' => 'src/comm/ConnAcceptor.h' --- src/comm/ListenStateData.h 2010-08-09 10:48:17 +0000 +++ src/comm/ConnAcceptor.h 2010-09-10 11:31:04 +0000 @@ -1,40 +1,77 @@ -#ifndef SQUID_LISTENERSTATEDATA_H -#define SQUID_LISTENERSTATEDATA_H +#ifndef SQUID_COMM_CONNACCEPTOR_H +#define SQUID_COMM_CONNACCEPTOR_H #include "config.h" -#include "base/AsyncCall.h" -#include "comm.h" +#include "base/Subscription.h" +#include "CommCalls.h" +#include "comm/comm_err_t.h" +#include "comm/forward.h" + #if HAVE_MAP #include #endif -class ConnectionDetail; - namespace Comm { -class ListenStateData +class AcceptLimiter; + +/** + * Listens on a Comm::Connection for new incoming connections and + * emits an active Comm::Connection descriptor for the new client. + * + * Handles all event limiting required to quash inbound connection + * floods within the global FD limits of available Squid_MaxFD and + * client_ip_max_connections. + * + * Fills the emitted connection with all connection details able to + * be looked up. Currently these are the local/remote IP:port details + * and the listening socket transparent-mode flag. + */ +class ConnAcceptor : public AsyncJob { +private: + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); public: - ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); - ListenStateData(const ListenStateData &r); // not implemented. - ~ListenStateData(); - - void subscribe(AsyncCall::Pointer &call); + ConnAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub); + ConnAcceptor(const ConnAcceptor &r); // not implemented. + + /** Subscribe a handler to receive calls back about new connections. + * Replaces any existing subscribed handler. + */ + void subscribe(const Subscription::Pointer &aSub); + + /** Remove the currently waiting callback subscription. + * Pending calls will remain scheduled. + */ + void unsubscribe(const char *reason); + + /** Try and accept another connection (synchronous). + * If one is pending already the subscribed callback handler will be scheduled + * to handle it before this method returns. + */ void acceptNext(); - void notify(int newfd, comm_err_t flag, const ConnectionDetail &details); - int fd; + /// Call the subscribed callback handler with details about a new connection. + void notify(comm_err_t flag, const Comm::ConnectionPointer &details); /// errno code of the last accept() or listen() action if one occurred. int errcode; - /// whether this socket is delayed and on the AcceptLimiter queue. - int32_t isLimited; - -private: - /// Method to test if there are enough file escriptors to open a new client connection +private: + friend class AcceptLimiter; + int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. + Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. + + /// conn being listened on for new connections + /// Reserved for read-only use. + ConnectionPointer conn; + +private: + /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed static bool okToAccept(); @@ -42,14 +79,12 @@ static void doAccept(int fd, void *data); void acceptOne(); - int oldAccept(ConnectionDetail &details); - - AsyncCall::Pointer theCallback; - bool mayAcceptMore; - + comm_err_t oldAccept(Comm::ConnectionPointer &details); void setListen(); + + CBDATA_CLASS2(ConnAcceptor); }; }; // namespace Comm -#endif /* SQUID_LISTENERSTATEDATA_H */ +#endif /* SQUID_COMM_CONNACCEPTOR_H */ === added file 'src/comm/ConnOpener.cc' --- src/comm/ConnOpener.cc 1970-01-01 00:00:00 +0000 +++ src/comm/ConnOpener.cc 2010-09-05 04:12:56 +0000 @@ -0,0 +1,290 @@ +/* + * DEBUG: section 05 Socket Connection Opener + */ + +#include "config.h" +#include "base/TextException.h" +#include "comm/ConnOpener.h" +#include "comm/Connection.h" +#include "comm.h" +#include "fde.h" +#include "icmp/net_db.h" +#include "SquidTime.h" + +namespace Comm { + CBDATA_CLASS_INIT(ConnOpener); +}; + +Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) : + AsyncJob("Comm::ConnOpener"), + host_(NULL), + conn_(c), + callback_(handler), + totalTries_(0), + failRetries_(0), + connectTimeout_(ctimeout), + connStart_(0) +{} + +Comm::ConnOpener::~ConnOpener() +{ + safe_free(host_); +} + +bool +Comm::ConnOpener::doneAll() const +{ + // is the conn_ to be opened still waiting? + if (conn_ != NULL) { + return false; + } + + // is the callback still to be called? + if (callback_ != NULL) { + return false; + } + + return AsyncJob::doneAll(); +} + +void +Comm::ConnOpener::swanSong() +{ + // cancel any event watchers + // done here to get the "swanSong" mention in cancel debugging. + if (calls_.earlyAbort_ != NULL) { + calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong"); + calls_.earlyAbort_ = NULL; + } + if (calls_.timeout_ != NULL) { + calls_.timeout_->cancel("Comm::ConnOpener::swanSong"); + calls_.timeout_ = NULL; + } + + // recover what we can from the job + if (conn_ != NULL && conn_->isOpen()) { + // it never reached fully open, so abort the FD + conn_->close(); + fd_table[conn_->fd].flags.open = 0; + // inform the caller + doneConnecting(COMM_ERR_CONNECT, 0); + } + + AsyncJob::swanSong(); +} + +void +Comm::ConnOpener::setHost(const char * new_host) +{ + // unset and erase if already set. + if (host_ != NULL) + safe_free(host_); + + // set the new one if given. + if (new_host != NULL) + host_ = xstrdup(new_host); +} + +const char * +Comm::ConnOpener::getHost() const +{ + return host_; +} + +/** + * Connection attempt are completed. One way or the other. + * Pass the results back to the external handler. + */ +void +Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno) +{ + if (callback_ != NULL) { + typedef CommConnectCbParams Params; + Params ¶ms = GetCommParams(callback_); + params.conn = conn_; + params.flag = status; + params.xerrno = xerrno; + ScheduleCallHere(callback_); + callback_ = NULL; + } + + /* ensure cleared local state, we are done. */ + conn_ = NULL; +} + +void +Comm::ConnOpener::start() +{ + Must(conn_ != NULL); + + /* get a socket open ready for connecting with */ + if (!conn_->isOpen()) { +#if USE_IPV6 + /* outbound sockets have no need to be protocol agnostic. */ + if (conn_->remote.IsIPv4()) { + conn_->local.SetIPv4(); + } +#endif + conn_->fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, host_); + if (!conn_->isOpen()) { + doneConnecting(COMM_ERR_CONNECT, 0); + return; + } + + if (calls_.earlyAbort_ == NULL) { + typedef CommCbMemFunT Dialer; + calls_.earlyAbort_ = asyncCall(5, 4, "Comm::ConnOpener::earlyAbort", + Dialer(this, &Comm::ConnOpener::earlyAbort)); + comm_add_close_handler(conn_->fd, calls_.earlyAbort_); + } + + if (calls_.timeout_ == NULL) { + typedef CommCbMemFunT Dialer; + calls_.timeout_ = asyncCall(5, 4, "Comm::ConnOpener::timeout", + Dialer(this, &Comm::ConnOpener::timeout)); + debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_); + commSetTimeout(conn_->fd, connectTimeout_, calls_.timeout_); + } + + if (connStart_ == 0) { + connStart_ = squid_curtime; + } + } + + typedef CommCbMemFunT Dialer; + calls_.connect_ = asyncCall(5, 4, "Comm::ConnOpener::connect", + Dialer(this, &Comm::ConnOpener::connect)); + ScheduleCallHere(calls_.connect_); +} + +/** Make an FD connection attempt. + * Handles the case(s) when a partially setup connection gets closed early. + */ +void +Comm::ConnOpener::connect(const CommConnectCbParams &unused) +{ + Must(conn_ != NULL); + + totalTries_++; + + switch (comm_connect_addr(conn_->fd, conn_->remote) ) { + + case COMM_INPROGRESS: + // check for timeout FIRST. + if(squid_curtime - connStart_ > connectTimeout_) { + debugs(5, 5, HERE << conn_ << ": * - ERR took too long already."); + doneConnecting(COMM_TIMEOUT, errno); + return; + } else { + debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS"); + commSetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::ConnectRetry, this, 0); + } + break; + + case COMM_OK: + debugs(5, 5, HERE << conn_ << ": COMM_OK - connected"); + + /* + * stats.conn_open is used to account for the number of + * connections that we have open to the peer, so we can limit + * based on the max-conn option. We need to increment here, + * even if the connection may fail. + */ + if (conn_->getPeer()) + conn_->getPeer()->stats.conn_open++; + + lookupLocalAddress(); + + /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to + * indicate the state of a raw fd object being passed around. + * Also, legacy code still depends on comm_local_port() with no access to Comm::Connection + * when those are done comm_local_port can become one of our member functions to do the below. + */ + fd_table[conn_->fd].flags.open = 1; + fd_table[conn_->fd].local_addr = conn_->local; + + if (host_ != NULL) + ipcacheMarkGoodAddr(host_, conn_->remote); + doneConnecting(COMM_OK, 0); + break; + + default: + debugs(5, 5, HERE << conn_ << ": * - try again"); + failRetries_++; + if (host_ != NULL) + ipcacheMarkBadAddr(host_, conn_->remote); +#if USE_ICMP + if (Config.onoff.test_reachability) + netdbDeleteAddrNetwork(conn_->remote); +#endif + + // check for timeout FIRST. + if(squid_curtime - connStart_ > connectTimeout_) { + debugs(5, 5, HERE << conn_ << ": * - ERR took too long already."); + doneConnecting(COMM_TIMEOUT, errno); + } else if (failRetries_ < Config.connect_retries) { + ScheduleCallHere(calls_.connect_); + } else { + // send ERROR back to the upper layer. + debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already."); + doneConnecting(COMM_ERR_CONNECT, errno); + } + } +} + +/** + * Lookup local-end address and port of the TCP link just opened. + * This ensure the connection local details are set correctly + */ +void +Comm::ConnOpener::lookupLocalAddress() +{ + struct addrinfo *addr = NULL; + conn_->local.InitAddrInfo(addr); + + if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) { + debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror()); + conn_->local.FreeAddrInfo(addr); + return; + } + + conn_->local = *addr; + conn_->local.FreeAddrInfo(addr); + debugs(5, 6, HERE << conn_); +} + +/** Abort connection attempt. + * Handles the case(s) when a partially setup connection gets closed early. + */ +void +Comm::ConnOpener::earlyAbort(const CommConnectCbParams &io) +{ + debugs(5, 3, HERE << io.conn); + doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better? +} + +/** + * Handles the case(s) when a partially setup connection gets timed out. + * NP: When commSetTimeout accepts generic CommCommonCbParams this can die. + */ +void +Comm::ConnOpener::timeout(const CommTimeoutCbParams &unused) +{ + ScheduleCallHere(calls_.connect_); +} + +/* Legacy Wrapper for the retry event after COMM_INPROGRESS + * TODO: As soon as comm IO accepts Async calls we can use a ConnOpener::connect call + */ +void +Comm::ConnOpener::ConnectRetry(int fd, void *data) +{ + ConnOpener *cs = static_cast(data); + + // Ew. we are now outside the all AsyncJob protections. + // get back inside by scheduling another call... + typedef CommCbMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(5, 4, "Comm::ConnOpener::connect", + Dialer(cs, &Comm::ConnOpener::connect)); + ScheduleCallHere(call); +} === added file 'src/comm/ConnOpener.h' --- src/comm/ConnOpener.h 1970-01-01 00:00:00 +0000 +++ src/comm/ConnOpener.h 2010-09-06 12:06:44 +0000 @@ -0,0 +1,72 @@ +#ifndef _SQUID_SRC_COMM_OPENERSTATEDATA_H +#define _SQUID_SRC_COMM_OPENERSTATEDATA_H + +#include "base/AsyncCall.h" +#include "base/AsyncJob.h" +#include "cbdata.h" +#include "CommCalls.h" +#include "comm/comm_err_t.h" +#include "comm/forward.h" + +namespace Comm { + +/** + * Async-opener of a Comm connection. + */ +class ConnOpener : public AsyncJob +{ +protected: + virtual void start(); + virtual void swanSong(); + +public: + virtual bool doneAll() const; + + ConnOpener(Comm::ConnectionPointer &, AsyncCall::Pointer &handler, time_t connect_timeout); + ~ConnOpener(); + + void setHost(const char *); ///< set the hostname note for this connection + const char * getHost() const; ///< get the hostname noted for this connection + +private: + // Undefined because two openers cannot share a connection + ConnOpener(const ConnOpener &); + ConnOpener & operator =(const ConnOpener &c); + + void connect(const CommConnectCbParams &unused); + void earlyAbort(const CommConnectCbParams &); + void timeout(const CommTimeoutCbParams &unused); + void doneConnecting(comm_err_t status, int xerrno); + static void ConnectRetry(int fd, void *data); + void lookupLocalAddress(); + +private: + char *host_; ///< domain name we are trying to connect to. + Comm::ConnectionPointer conn_; ///< single connection currently being opened. + AsyncCall::Pointer callback_; ///< handler to be called on connection completion. + + int totalTries_; ///< total number of connection attempts over all destinations so far. + int failRetries_; ///< number of retries current destination has been tried. + + /** + * time at which to abandon the connection. + * the connection-done callback will be passed COMM_TIMEOUT + */ + time_t connectTimeout_; + + /// time at which this series of connection attempts was started. + time_t connStart_; + + /// handles to calls which we may need to cancel. + struct Calls { + AsyncCall::Pointer connect_; + AsyncCall::Pointer earlyAbort_; + AsyncCall::Pointer timeout_; + } calls_; + + CBDATA_CLASS2(ConnOpener); +}; + +}; // namespace Comm + +#endif /* _SQUID_SRC_COMM_CONNOPENER_H */ === added file 'src/comm/Connection.cc' --- src/comm/Connection.cc 1970-01-01 00:00:00 +0000 +++ src/comm/Connection.cc 2010-08-13 09:20:32 +0000 @@ -0,0 +1,77 @@ +#include "config.h" +#include "cbdata.h" +#include "comm.h" +#include "comm/Connection.h" + +bool +Comm::IsConnOpen(const Comm::ConnectionPointer &conn) +{ + return conn != NULL && conn->isOpen(); +} + + +Comm::Connection::Connection() : + local(), + remote(), + peerType(HIER_NONE), + fd(-1), + tos(0), + flags(COMM_NONBLOCKING), + _peer(NULL) +{} + +Comm::Connection::~Connection() +{ + close(); + cbdataReferenceDone(_peer); +} + +Comm::ConnectionPointer +Comm::Connection::copyDetails() const +{ + ConnectionPointer c = new Comm::Connection; + + c->local = local; + c->remote = remote; + c->peerType = peerType; + c->tos = tos; + c->flags = flags; + + // ensure FD is not open in the new copy. + c->fd = -1; + + // ensure we have a cbdata reference to _peer not a straight ptr copy. + c->_peer = cbdataReference(_peer); + + return c; +} + +void +Comm::Connection::close() +{ + if (isOpen()) { + comm_close(fd); + fd = -1; + if (_peer) + _peer->stats.conn_open--; + } +} + +void +Comm::Connection::setPeer(peer *p) +{ + /* set to self. nothing to do. */ + if (_peer == p) + return; + + /* clear any previous ptr */ + if (_peer) { + cbdataReferenceDone(_peer); + _peer = NULL; + } + + /* set the new one (unless it is NULL */ + if (p) { + _peer = cbdataReference(p); + } +} === renamed file 'src/ConnectionDetail.h' => 'src/comm/Connection.h' --- src/ConnectionDetail.h 2010-05-26 03:06:02 +0000 +++ src/comm/Connection.h 2010-08-17 07:45:20 +0000 @@ -1,5 +1,6 @@ /* * DEBUG: section 05 Socket Functions + * AUTHOR: Amos Jeffries * AUTHOR: Robert Collins * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -30,23 +31,138 @@ * * * Copyright (c) 2003, Robert Collins + * Copyright (c) 2010, Amos Jeffries */ #ifndef _SQUIDCONNECTIONDETAIL_H_ #define _SQUIDCONNECTIONDETAIL_H_ +#include "config.h" +#include "comm/forward.h" +#include "hier_code.h" #include "ip/Address.h" - -class ConnectionDetail +#include "RefCount.h" + +#if HAVE_IOSFWD +#include +#endif +#if HAVE_OSTREAM +#include +#endif + +struct peer; + +namespace Comm { + +/* TODO: make these a struct of boolean flags members in the connection instead of a bitmap. + * we can't do that until all non-comm code uses Commm::Connection objects to create FD + * currently there is code still using comm_open() and comm_openex() synchronously!! + */ +#define COMM_UNSET 0x00 +#define COMM_NONBLOCKING 0x01 +#define COMM_NOCLOEXEC 0x02 +#define COMM_REUSEADDR 0x04 +#define COMM_TRANSPARENT 0x08 +#define COMM_DOBIND 0x10 + +/** + * Store data about the physical and logical attributes of a connection. + * + * Some link state can be infered from the data, however this is not an + * object for state data. But a semantic equivalent for FD with easily + * accessible cached properties not requiring repeated complex lookups. + * + * While the properties may be changed, this is for teh purpose of creating + * potential connection descriptors which may be opened. Properties should + * be considered read-only outside of the Comm layer code once the connection + * is open. + * + * These objects must not be passed around directly, + * but a Comm::ConnectionPointer must be passed instead. + */ +class Connection : public RefCountable { - -public: - - ConnectionDetail(); - - Ip::Address me; - - Ip::Address peer; +public: + /** standard empty connection creation */ + Connection(); + + /** Clear the connection properties and close any open socket. */ + ~Connection(); + + /** Copy an existing connections IP and properties. + * This excludes the FD. The new copy will be a closed connection. + */ + ConnectionPointer copyDetails() const; + + /** Close any open socket. */ + void close(); + + /** determine whether this object describes an active connection or not. */ + bool isOpen() const { return (fd >= 0); } + + /** retrieve the peer pointer for use. + * The caller is responsible for all CBDATA operations regarding the + * used of the pointer returned. + */ + peer * const getPeer() const { return _peer; } + + /** alter the stored peer pointer. + * Perform appropriate CBDATA operations for locking the peer pointer + */ + void setPeer(peer * p); + +private: + /** These objects may not be exactly duplicated. Use copyDetails() instead. */ + Connection(const Connection &c); + + /** These objects may not be exactly duplicated. Use copyDetails() instead. */ + Connection & operator =(const Connection &c); + +public: + /** Address/Port for the Squid end of a TCP link. */ + Ip::Address local; + + /** Address for the Remote end of a TCP link. */ + Ip::Address remote; + + /** Hierarchy code for this connection link */ + hier_code peerType; + + /** Socket used by this connection. -1 if no socket has been opened. */ + int fd; + + /** Quality of Service TOS values currently sent on this connection */ + int tos; + + /** COMM flags set on this connection */ + int flags; + +private: + /** cache_peer data object (if any) */ + peer *_peer; }; +}; // namespace Comm + + +// NP: Order and namespace here is very important. +// * The second define inlines the first. +// * Stream inheritance overloading is searched in the global scope first. + +inline std::ostream & +operator << (std::ostream &os, const Comm::Connection &conn) +{ + os << "FD " << conn.fd << " local=" << conn.local << + " remote=" << conn.remote << " flags=" << conn.flags; + return os; +} + +inline std::ostream & +operator << (std::ostream &os, const Comm::ConnectionPointer &conn) +{ + if (conn != NULL) + os << *conn; + return os; +} + #endif === modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2009-12-31 02:35:01 +0000 +++ src/comm/Makefile.am 2010-08-15 07:52:51 +0000 @@ -1,13 +1,22 @@ include $(top_srcdir)/src/Common.am include $(top_srcdir)/src/TestHeaders.am -noinst_LTLIBRARIES = libcomm-listener.la +noinst_LTLIBRARIES = libcomm.la -## Library holding listener comm socket handlers -libcomm_listener_la_SOURCES= \ +## First group are listener comm socket handlers +## Second group are outbound connection setup handlers +## Third group are misc shared comm objects +libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ - ListenStateData.cc \ - ListenStateData.h \ - \ - comm_internal.h + ConnAcceptor.cc \ + ConnAcceptor.h \ + \ + ConnOpener.cc \ + ConnOpener.h \ + \ + Connection.cc \ + Connection.h \ + comm_err_t.h \ + comm_internal.h \ + forward.h === added file 'src/comm/comm_err_t.h' --- src/comm/comm_err_t.h 1970-01-01 00:00:00 +0000 +++ src/comm/comm_err_t.h 2010-05-15 14:57:24 +0000 @@ -0,0 +1,21 @@ +#ifndef _SQUID_COMM_COMM_ERR_T_H +#define _SQUID_COMM_COMM_ERR_T_H + +#include "config.h" + +typedef enum { + COMM_OK = 0, + COMM_ERROR = -1, + COMM_NOMESSAGE = -3, + COMM_TIMEOUT = -4, + COMM_SHUTDOWN = -5, + COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */ + COMM_INPROGRESS = -7, + COMM_ERR_CONNECT = -8, + COMM_ERR_DNS = -9, + COMM_ERR_CLOSING = -10, + COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */ + COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */ +} comm_err_t; + +#endif /* _SQUID_COMM_COMM_ERR_T_H */ === added file 'src/comm/forward.h' --- src/comm/forward.h 1970-01-01 00:00:00 +0000 +++ src/comm/forward.h 2010-07-24 05:23:58 +0000 @@ -0,0 +1,19 @@ +#ifndef _SQUID_COMM_FORWARD_H +#define _SQUID_COMM_FORWARD_H + +#include "Array.h" +#include "RefCount.h" + +namespace Comm { + +class Connection; + +typedef RefCount ConnectionPointer; + +typedef Vector ConnectionList; + +bool IsConnOpen(const Comm::ConnectionPointer &conn); + +}; // namespace Comm + +#endif /* _SQUID_COMM_FORWARD_H */