Author: Ales Rousskov Author: Rainer Weikusat Fixed several ConnOpener problems by relying on AsyncJob protections and comm_close(), while maintaining a tighter grip on various I/O and sleep states. * Connection descriptor was not closed when attempting to reconnect after failures. We now properly close on failures, sleep with descriptor closed, and then reopen. * Timeout handler was not cleaned up properly in some cases, causing memory leaks (for the handler Pointer) and possibly timeouts that were fired (for then-active handler) after the connection was passed to the initiator. * Comm close handler was not cleaned up properly. * statCounter.syscalls.sock.closes counter was not updated on FD closure. * Waiting pending accepts were not kicked on FD closure. * Connection timeout was enforced for each connection attempt instead of applying to all attempts taken together. and possibly other problems. The full extent of all side-effects of mishandled race conditions and state conflicts is probably unknown. TODO: Needs more testing, especially around corner cases. Does somebody need more specific callback cancellation reasons? Make connect_timeout documentation in squid.conf less ambiguous. Move prevalent conn_ debugging to the status() method? Polish Comm timeout handling to always reset .timeout on callback? Polish comm_close() to always reset "Select" state? Consider revising eventDelete() to delete between-I/O sleep timeout. === modified file 'src/comm/ConnOpener.cc' --- src/comm/ConnOpener.cc 2013-01-16 10:35:54 +0000 +++ src/comm/ConnOpener.cc 2013-01-25 17:15:30 +0000 @@ -16,343 +16,432 @@ #include "ip/tools.h" #include "SquidConfig.h" #include "SquidTime.h" #if HAVE_ERRNO_H #include #endif class CachePeer; CBDATA_NAMESPACED_CLASS_INIT(Comm, ConnOpener); Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) : AsyncJob("Comm::ConnOpener"), host_(NULL), temporaryFd_(-1), conn_(c), callback_(handler), totalTries_(0), failRetries_(0), - connectTimeout_(ctimeout), - connectStart_(0) + deadline_(squid_curtime + static_cast(ctimeout)) {} Comm::ConnOpener::~ConnOpener() { safe_free(host_); } bool Comm::ConnOpener::doneAll() const { // is the conn_ to be opened still waiting? if (conn_ == NULL) { return AsyncJob::doneAll(); } // is the callback still to be called? if (callback_ == NULL || callback_->canceled()) { return AsyncJob::doneAll(); } + // otherwise, we must be waiting for something + Must(temporaryFd_ >= 0 || calls_.sleep_); return false; } void Comm::ConnOpener::swanSong() { - // cancel any event watchers - // done here to get the "swanSong" mention in cancel debugging. - if (calls_.earlyAbort_ != NULL) { - calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong"); - calls_.earlyAbort_ = NULL; - } - if (calls_.timeout_ != NULL) { - calls_.timeout_->cancel("Comm::ConnOpener::swanSong"); - calls_.timeout_ = NULL; - } - if (callback_ != NULL) { - if (callback_->canceled()) - callback_ = NULL; - else - // inform the still-waiting caller we are dying - doneConnecting(COMM_ERR_CONNECT, 0); + // inform the still-waiting caller we are dying + sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::swanSong"); } - // rollback what we can from the job state - if (temporaryFd_ >= 0) { - // doneConnecting() handles partial FD connection cleanup - doneConnecting(COMM_ERR_CONNECT, 0); - } + if (temporaryFd_ >= 0) + closeFd(); + + if (calls_.sleep_) + cancelSleep(); AsyncJob::swanSong(); } void Comm::ConnOpener::setHost(const char * new_host) { // unset and erase if already set. if (host_ != NULL) safe_free(host_); // set the new one if given. if (new_host != NULL) host_ = xstrdup(new_host); } const char * Comm::ConnOpener::getHost() const { return host_; } /** * Connection attempt are completed. One way or the other. * Pass the results back to the external handler. - * NP: on errors the earlyAbort call should be cancelled first with a reason. */ void -Comm::ConnOpener::doneConnecting(comm_err_t errFlag, int xerrno) +Comm::ConnOpener::sendAnswer(comm_err_t errFlag, int xerrno, const char *why) { // only mark the address good/bad AFTER connect is finished. if (host_ != NULL) { - if (xerrno == 0) + if (xerrno == 0) // XXX: should not we use errFlag instead? ipcacheMarkGoodAddr(host_, conn_->remote); else { ipcacheMarkBadAddr(host_, conn_->remote); #if USE_ICMP if (Config.onoff.test_reachability) netdbDeleteAddrNetwork(conn_->remote); #endif } } if (callback_ != NULL) { typedef CommConnectCbParams Params; Params ¶ms = GetCommParams(callback_); params.conn = conn_; params.flag = errFlag; params.xerrno = xerrno; ScheduleCallHere(callback_); callback_ = NULL; } - if (temporaryFd_ >= 0) { - debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_); - // it never reached fully open, so cleanup the FD handlers - // Note that comm_close() sequence does not happen for partially open FD - Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0); + // The job will stop without this call because nil callback_ makes + // doneAll() true, but this explicit call creates nicer debugging. + mustStop(why); +} + +/// cleans up this job I/O state without closing temporaryFd +/// required before closing temporaryFd or keeping it in conn_ +/// leaves FD bare so must only be called via closeFd() or keepFd() +void +Comm::ConnOpener::cleanFd() +{ + debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_); + + Must(temporaryFd_ >= 0); + fde &f = fd_table[temporaryFd_]; + + // Our write_handler was set without using Comm::Write API, so we cannot + // use a cancellable Pointer-free job callback and simply cancel it here. + if (f.write_handler) { + + /* XXX: We are about to remove write_handler, which was responsible + * for deleting write_data, so we have to delete write_data + * ourselves. Comm currently calls SetSelect handlers synchronously + * so if write_handler is set, we know it has not been called yet. + * ConnOpener converts that sync call into an async one, but only + * after deleting ptr, so that is not a problem. + */ + + delete static_cast(f.write_data); + f.write_data = NULL; + } + // Comm::DoSelect does not do this when calling and resetting write_handler + // (because it expects more writes to come?). We mimic that optimization by + // resetting Comm "Select" state only when the FD is actually closed. + // Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0); + + if (calls_.timeout_ != NULL) { + calls_.timeout_->cancel("Comm::ConnOpener::cleanFd"); + calls_.timeout_ = NULL; + } + // Comm checkTimeouts() and commCloseAllSockets() do not clear .timeout + // when calling timeoutHandler (XXX fix them), so we clear unconditionally. + f.timeoutHandler = NULL; + f.timeout = 0; + + if (calls_.earlyAbort_ != NULL) { + comm_remove_close_handler(temporaryFd_, calls_.earlyAbort_); calls_.earlyAbort_ = NULL; - if (calls_.timeout_ != NULL) { - calls_.timeout_->cancel("Comm::ConnOpener::doneConnecting"); - calls_.timeout_ = NULL; - } - fd_table[temporaryFd_].timeoutHandler = NULL; - fd_table[temporaryFd_].timeout = 0; - close(temporaryFd_); - fd_close(temporaryFd_); - temporaryFd_ = -1; } +} - /* ensure cleared local state, we are done. */ - conn_ = NULL; +/// cleans I/O state and ends I/O for temporaryFd_ +void +Comm::ConnOpener::closeFd() +{ + if (temporaryFd_ < 0) + return; + + cleanFd(); + + // comm_close() below uses COMMIO_FD_WRITECB(fd)->active() to clear Comm + // "Select" state. It will not clear ours. XXX: It should always clear + // because a callback may have been active but was called before comm_close + Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0); + + comm_close(temporaryFd_); + temporaryFd_ = -1; +} + +/// cleans I/O state and moves temporaryFd_ to the conn_ for long-term use +void +Comm::ConnOpener::keepFd() +{ + Must(conn_ != NULL); + Must(temporaryFd_ >= 0); + + cleanFd(); + + conn_->fd = temporaryFd_; + temporaryFd_ = -1; } void Comm::ConnOpener::start() { Must(conn_ != NULL); - /* get a socket open ready for connecting with */ + /* outbound sockets have no need to be protocol agnostic. */ + if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) { + conn_->local.SetIPv4(); + } + + if (createFd()) + connect(); +} + +/// called at the end of Comm::ConnOpener::DelayedConnectRetry event +void +Comm::ConnOpener::restart() { + debugs(5, 5, conn_ << " restarting after sleep"); + calls_.sleep_ = false; + + if (createFd()) + connect(); +} + +/// Create a socket for the future connection or return false. +/// If false is returned, done() is guaranteed to return true and end the job. +bool +Comm::ConnOpener::createFd() +{ + Must(temporaryFd_ < 0); + + // our initators signal abort by cancelling their callbacks + if (callback_ == NULL || callback_->canceled()) + return false; + + temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_); if (temporaryFd_ < 0) { - /* outbound sockets have no need to be protocol agnostic. */ - if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) { - conn_->local.SetIPv4(); - } - temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_); - if (temporaryFd_ < 0) { - doneConnecting(COMM_ERR_CONNECT, 0); - return; - } + sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::createFd"); + return false; } typedef CommCbMemFunT abortDialer; calls_.earlyAbort_ = JobCallback(5, 4, abortDialer, this, Comm::ConnOpener::earlyAbort); comm_add_close_handler(temporaryFd_, calls_.earlyAbort_); typedef CommCbMemFunT timeoutDialer; calls_.timeout_ = JobCallback(5, 4, timeoutDialer, this, Comm::ConnOpener::timeout); - debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_); + debugs(5, 3, conn_ << " will timeout in " << (deadline_ - squid_curtime)); - // Update the fd_table directly because conn_ is not yet storing the FD + // Update the fd_table directly because commSetConnTimeout() needs open conn_ assert(temporaryFd_ < Squid_MaxFD); assert(fd_table[temporaryFd_].flags.open); typedef CommTimeoutCbParams Params; Params ¶ms = GetCommParams(calls_.timeout_); params.conn = conn_; fd_table[temporaryFd_].timeoutHandler = calls_.timeout_; - fd_table[temporaryFd_].timeout = squid_curtime + (time_t) connectTimeout_; + fd_table[temporaryFd_].timeout = deadline_; - connectStart_ = squid_curtime; - connect(); + return true; } void Comm::ConnOpener::connected() { - conn_->fd = temporaryFd_; - temporaryFd_ = -1; + Must(temporaryFd_ >= 0); + keepFd(); /* * stats.conn_open is used to account for the number of * connections that we have open to the CachePeer, so we can limit * based on the max-conn option. We need to increment here, * even if the connection may fail. */ if (CachePeer *peer=(conn_->getPeer())) ++peer->stats.conn_open; lookupLocalAddress(); /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to * indicate the state of a raw fd object being passed around. * Also, legacy code still depends on comm_local_port() with no access to Comm::Connection * when those are done comm_local_port can become one of our member functions to do the below. */ - fd_table[conn_->fd].flags.open = 1; + Must(fd_table[conn_->fd].flags.open); fd_table[conn_->fd].local_addr = conn_->local; + + sendAnswer(COMM_OK, 0, "Comm::ConnOpener::connected"); } -/** Make an FD connection attempt. - * Handles the case(s) when a partially setup connection gets closed early. - */ +/// Make an FD connection attempt. void Comm::ConnOpener::connect() { Must(conn_ != NULL); - - // our parent Jobs signal abort by cancelling their callbacks. - if (callback_ == NULL || callback_->canceled()) - return; + Must(temporaryFd_ >= 0); ++ totalTries_; switch (comm_connect_addr(temporaryFd_, conn_->remote) ) { case COMM_INPROGRESS: - // check for timeout FIRST. - if (squid_curtime - connectStart_ > connectTimeout_) { - debugs(5, 5, HERE << conn_ << ": * - ERR took too long already."); - calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out"); - doneConnecting(COMM_TIMEOUT, errno); - return; - } else { - debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS"); - Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0); - } + debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS"); + Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0); break; case COMM_OK: debugs(5, 5, HERE << conn_ << ": COMM_OK - connected"); connected(); - doneConnecting(COMM_OK, 0); break; - default: + default: { + const int xerrno = errno; + ++failRetries_; + debugs(5, 7, conn_ << ": failure #" << failRetries_ << " <= " << + Config.connect_retries << ": " << xstrerr(xerrno)); - // check for timeout FIRST. - if (squid_curtime - connectStart_ > connectTimeout_) { - debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response."); - calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out"); - doneConnecting(COMM_TIMEOUT, errno); - } else if (failRetries_ < Config.connect_retries) { + if (failRetries_ < Config.connect_retries) { debugs(5, 5, HERE << conn_ << ": * - try again"); - eventAdd("Comm::ConnOpener::DelayedConnectRetry", Comm::ConnOpener::DelayedConnectRetry, new Pointer(this), 0.05, 0, false); + sleep(); return; } else { // send ERROR back to the upper layer. debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already."); - calls_.earlyAbort_->cancel("Comm::ConnOpener::connect failed"); - doneConnecting(COMM_ERR_CONNECT, errno); + sendAnswer(COMM_ERR_CONNECT, xerrno, "Comm::ConnOpener::connect"); } } + } +} + +/// Close and wait a little before trying to open and connect again. +void +Comm::ConnOpener::sleep() { + Must(!calls_.sleep_); + closeFd(); + calls_.sleep_ = true; + eventAdd("Comm::ConnOpener::DelayedConnectRetry", + Comm::ConnOpener::DelayedConnectRetry, + new Pointer(this), 0.05, 0, false); +} + +/// cleans up this job sleep state +void +Comm::ConnOpener::cancelSleep() +{ + if (calls_.sleep_) { + // It would be nice to delete the sleep event, but it might be out of + // the event queue and in the async queue already, so (a) we do not know + // whether we can safely delete the call ptr here and (b) eventDelete() + // will assert if the event went async. Thus, we let the event run so + // that it deletes the call ptr [after this job is gone]. Note that we + // are called only when the job ends so this "hanging event" will do + // nothing but deleting the call ptr. TODO: Revise eventDelete() API. + // eventDelete(Comm::ConnOpener::DelayedConnectRetry, calls_.sleep); + calls_.sleep_ = false; + debugs(5, 9, conn_ << " stops sleeping"); + } } /** * Lookup local-end address and port of the TCP link just opened. * This ensure the connection local details are set correctly */ void Comm::ConnOpener::lookupLocalAddress() { struct addrinfo *addr = NULL; conn_->local.InitAddrInfo(addr); if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) { debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror()); conn_->local.FreeAddrInfo(addr); return; } conn_->local = *addr; conn_->local.FreeAddrInfo(addr); debugs(5, 6, HERE << conn_); } /** Abort connection attempt. * Handles the case(s) when a partially setup connection gets closed early. */ void Comm::ConnOpener::earlyAbort(const CommCloseCbParams &io) { debugs(5, 3, HERE << io.conn); - doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better? + calls_.earlyAbort_ = NULL; + // NP: is closing or shutdown better? + sendAnswer(COMM_ERR_CLOSING, io.xerrno, "Comm::ConnOpener::earlyAbort"); } /** * Handles the case(s) when a partially setup connection gets timed out. * NP: When commSetConnTimeout accepts generic CommCommonCbParams this can die. */ void Comm::ConnOpener::timeout(const CommTimeoutCbParams &) { - connect(); + debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response."); + calls_.timeout_ = NULL; + sendAnswer(COMM_TIMEOUT, ETIMEDOUT, "Comm::ConnOpener::timeout"); } /* Legacy Wrapper for the retry event after COMM_INPROGRESS * XXX: As soon as Comm::SetSelect() accepts Async calls we can use a ConnOpener::connect call */ void Comm::ConnOpener::InProgressConnectRetry(int fd, void *data) { Pointer *ptr = static_cast(data); assert(ptr); if (ConnOpener *cs = ptr->valid()) { // Ew. we are now outside the all AsyncJob protections. // get back inside by scheduling another call... typedef NullaryMemFunT Dialer; AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect); ScheduleCallHere(call); } delete ptr; } /* Legacy Wrapper for the retry event with small delay after errors. - * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::connect call + * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::restart call */ void Comm::ConnOpener::DelayedConnectRetry(void *data) { Pointer *ptr = static_cast(data); assert(ptr); if (ConnOpener *cs = ptr->valid()) { // Ew. we are now outside the all AsyncJob protections. // get back inside by scheduling another call... typedef NullaryMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect); + AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::restart); ScheduleCallHere(call); } delete ptr; } === modified file 'src/comm/ConnOpener.h' --- src/comm/ConnOpener.h 2013-01-02 23:40:49 +0000 +++ src/comm/ConnOpener.h 2013-01-25 17:11:40 +0000 @@ -23,57 +23,64 @@ public: void noteAbort() { mustStop("externally aborted"); } typedef CbcPointer Pointer; virtual bool doneAll() const; ConnOpener(Comm::ConnectionPointer &, AsyncCall::Pointer &handler, time_t connect_timeout); ~ConnOpener(); void setHost(const char *); ///< set the hostname note for this connection const char * getHost() const; ///< get the hostname noted for this connection private: // Undefined because two openers cannot share a connection ConnOpener(const ConnOpener &); ConnOpener & operator =(const ConnOpener &c); void earlyAbort(const CommCloseCbParams &); void timeout(const CommTimeoutCbParams &); - void doneConnecting(comm_err_t errFlag, int xerrno); + void sendAnswer(comm_err_t errFlag, int xerrno, const char *why); static void InProgressConnectRetry(int fd, void *data); static void DelayedConnectRetry(void *data); void connect(); void connected(); void lookupLocalAddress(); + void sleep(); + void restart(); + + bool createFd(); + void closeFd(); + void keepFd(); + void cleanFd(); + + void cancelSleep(); + private: char *host_; ///< domain name we are trying to connect to. int temporaryFd_; ///< the FD being opened. Do NOT set conn_->fd until it is fully open. Comm::ConnectionPointer conn_; ///< single connection currently to be opened. AsyncCall::Pointer callback_; ///< handler to be called on connection completion. int totalTries_; ///< total number of connection attempts over all destinations so far. int failRetries_; ///< number of retries current destination has been tried. - /** - * time at which to abandon the connection. - * the connection-done callback will be passed COMM_TIMEOUT - */ - time_t connectTimeout_; - - /// time at which this series of connection attempts was started. - time_t connectStart_; + /// if we are not done by then, we will call back with COMM_TIMEOUT + time_t deadline_; /// handles to calls which we may need to cancel. struct Calls { AsyncCall::Pointer earlyAbort_; AsyncCall::Pointer timeout_; + /// Whether we are idling before retrying to connect; not yet a call + /// [that we can cancel], but it will probably become one eventually. + bool sleep_; } calls_; CBDATA_CLASS2(ConnOpener); }; }; // namespace Comm #endif /* _SQUID_SRC_COMM_CONNOPENER_H */