Author: Ales Rousskov <rousskov@measurement-factory.com>
Author: Rainer Weikusat <rweikusat@mobileactivedefense.com>
Fixed several ConnOpener problems by relying on AsyncJob protections and
comm_close(), while maintaining a tighter grip on various I/O and sleep states.

* Connection descriptor was not closed when attempting to reconnect after
  failures. We now properly close on failures, sleep with descriptor closed,
  and then reopen.

* Timeout handler was not cleaned up properly in some cases, causing memory
  leaks (for the handler Pointer) and possibly timeouts that were fired (for
  then-active handler) after the connection was passed to the initiator.

* Comm close handler was not cleaned up properly.

* statCounter.syscalls.sock.closes counter was not updated on FD closure.

* Waiting pending accepts were not kicked on FD closure.

* Connection timeout was enforced for each connection attempt instead of 
  applying to all attempts taken together.

and possibly other problems. The full extent of all side-effects of mishandled
race conditions and state conflicts is probably unknown.


TODO: Needs more testing, especially around corner cases.
      Does somebody need more specific callback cancellation reasons?
      Make connect_timeout documentation in squid.conf less ambiguous.
      Move prevalent conn_ debugging to the status() method?
      Polish Comm timeout handling to always reset .timeout on callback?
      Polish comm_close() to always reset "Select" state?
      Consider revising eventDelete() to delete between-I/O sleep timeout.

=== modified file 'src/comm/ConnOpener.cc'
--- src/comm/ConnOpener.cc	2013-01-16 10:35:54 +0000
+++ src/comm/ConnOpener.cc	2013-01-25 17:15:30 +0000
@@ -16,343 +16,432 @@
 #include "ip/tools.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
 
 #if HAVE_ERRNO_H
 #include <errno.h>
 #endif
 
 class CachePeer;
 
 CBDATA_NAMESPACED_CLASS_INIT(Comm, ConnOpener);
 
 Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) :
         AsyncJob("Comm::ConnOpener"),
         host_(NULL),
         temporaryFd_(-1),
         conn_(c),
         callback_(handler),
         totalTries_(0),
         failRetries_(0),
-        connectTimeout_(ctimeout),
-        connectStart_(0)
+        deadline_(squid_curtime + static_cast<time_t>(ctimeout))
 {}
 
 Comm::ConnOpener::~ConnOpener()
 {
     safe_free(host_);
 }
 
 bool
 Comm::ConnOpener::doneAll() const
 {
     // is the conn_ to be opened still waiting?
     if (conn_ == NULL) {
         return AsyncJob::doneAll();
     }
 
     // is the callback still to be called?
     if (callback_ == NULL || callback_->canceled()) {
         return AsyncJob::doneAll();
     }
 
+    // otherwise, we must be waiting for something
+    Must(temporaryFd_ >= 0 || calls_.sleep_);
     return false;
 }
 
 void
 Comm::ConnOpener::swanSong()
 {
-    // cancel any event watchers
-    // done here to get the "swanSong" mention in cancel debugging.
-    if (calls_.earlyAbort_ != NULL) {
-        calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong");
-        calls_.earlyAbort_ = NULL;
-    }
-    if (calls_.timeout_ != NULL) {
-        calls_.timeout_->cancel("Comm::ConnOpener::swanSong");
-        calls_.timeout_ = NULL;
-    }
-
     if (callback_ != NULL) {
-        if (callback_->canceled())
-            callback_ = NULL;
-        else
-            // inform the still-waiting caller we are dying
-            doneConnecting(COMM_ERR_CONNECT, 0);
+        // inform the still-waiting caller we are dying
+        sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::swanSong");
     }
 
-    // rollback what we can from the job state
-    if (temporaryFd_ >= 0) {
-        // doneConnecting() handles partial FD connection cleanup
-        doneConnecting(COMM_ERR_CONNECT, 0);
-    }
+    if (temporaryFd_ >= 0)
+        closeFd();
+
+    if (calls_.sleep_)
+        cancelSleep();
 
     AsyncJob::swanSong();
 }
 
 void
 Comm::ConnOpener::setHost(const char * new_host)
 {
     // unset and erase if already set.
     if (host_ != NULL)
         safe_free(host_);
 
     // set the new one if given.
     if (new_host != NULL)
         host_ = xstrdup(new_host);
 }
 
 const char *
 Comm::ConnOpener::getHost() const
 {
     return host_;
 }
 
 /**
  * Connection attempt are completed. One way or the other.
  * Pass the results back to the external handler.
- * NP: on errors the earlyAbort call should be cancelled first with a reason.
  */
 void
-Comm::ConnOpener::doneConnecting(comm_err_t errFlag, int xerrno)
+Comm::ConnOpener::sendAnswer(comm_err_t errFlag, int xerrno, const char *why)
 {
     // only mark the address good/bad AFTER connect is finished.
     if (host_ != NULL) {
-        if (xerrno == 0)
+        if (xerrno == 0) // XXX: should not we use errFlag instead?
             ipcacheMarkGoodAddr(host_, conn_->remote);
         else {
             ipcacheMarkBadAddr(host_, conn_->remote);
 #if USE_ICMP
             if (Config.onoff.test_reachability)
                 netdbDeleteAddrNetwork(conn_->remote);
 #endif
         }
     }
 
     if (callback_ != NULL) {
         typedef CommConnectCbParams Params;
         Params &params = GetCommParams<Params>(callback_);
         params.conn = conn_;
         params.flag = errFlag;
         params.xerrno = xerrno;
         ScheduleCallHere(callback_);
         callback_ = NULL;
     }
 
-    if (temporaryFd_ >= 0) {
-        debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
-        // it never reached fully open, so cleanup the FD handlers
-        // Note that comm_close() sequence does not happen for partially open FD
-        Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
+    // The job will stop without this call because nil callback_ makes
+    // doneAll() true, but this explicit call creates nicer debugging.
+    mustStop(why);
+}
+
+/// cleans up this job I/O state without closing temporaryFd
+/// required before closing temporaryFd or keeping it in conn_
+/// leaves FD bare so must only be called via closeFd() or keepFd()
+void
+Comm::ConnOpener::cleanFd()
+{
+    debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
+
+    Must(temporaryFd_ >= 0);
+    fde &f = fd_table[temporaryFd_];
+
+    // Our write_handler was set without using Comm::Write API, so we cannot
+    // use a cancellable Pointer-free job callback and simply cancel it here.
+    if (f.write_handler) {
+
+        /* XXX: We are about to remove write_handler, which was responsible
+         * for deleting write_data, so we have to delete write_data
+         * ourselves. Comm currently calls SetSelect handlers synchronously
+         * so if write_handler is set, we know it has not been called yet.
+         * ConnOpener converts that sync call into an async one, but only
+         * after deleting ptr, so that is not a problem.
+         */
+
+        delete static_cast<Pointer*>(f.write_data);
+        f.write_data = NULL;
+    }
+    // Comm::DoSelect does not do this when calling and resetting write_handler
+    // (because it expects more writes to come?). We mimic that optimization by
+    // resetting Comm "Select" state only when the FD is actually closed.
+    // Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
+ 
+    if (calls_.timeout_ != NULL) {
+        calls_.timeout_->cancel("Comm::ConnOpener::cleanFd");
+        calls_.timeout_ = NULL;
+    }
+    // Comm checkTimeouts() and commCloseAllSockets() do not clear .timeout 
+    // when calling timeoutHandler (XXX fix them), so we clear unconditionally.
+    f.timeoutHandler = NULL;
+    f.timeout = 0;
+
+    if (calls_.earlyAbort_ != NULL) {
+        comm_remove_close_handler(temporaryFd_, calls_.earlyAbort_);
         calls_.earlyAbort_ = NULL;
-        if (calls_.timeout_ != NULL) {
-            calls_.timeout_->cancel("Comm::ConnOpener::doneConnecting");
-            calls_.timeout_ = NULL;
-        }
-        fd_table[temporaryFd_].timeoutHandler = NULL;
-        fd_table[temporaryFd_].timeout = 0;
-        close(temporaryFd_);
-        fd_close(temporaryFd_);
-        temporaryFd_ = -1;
     }
+}
 
-    /* ensure cleared local state, we are done. */
-    conn_ = NULL;
+/// cleans I/O state and ends I/O for temporaryFd_
+void
+Comm::ConnOpener::closeFd()
+{
+    if (temporaryFd_ < 0)
+        return;
+
+    cleanFd();
+
+    // comm_close() below uses COMMIO_FD_WRITECB(fd)->active() to clear Comm
+    // "Select" state. It will not clear ours. XXX: It should always clear
+    // because a callback may have been active but was called before comm_close
+    Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
+
+    comm_close(temporaryFd_);
+    temporaryFd_ = -1;
+}
+
+/// cleans I/O state and moves temporaryFd_ to the conn_ for long-term use
+void
+Comm::ConnOpener::keepFd()
+{
+    Must(conn_ != NULL);
+    Must(temporaryFd_ >= 0);
+
+    cleanFd();
+
+    conn_->fd = temporaryFd_;
+    temporaryFd_ = -1;
 }
 
 void
 Comm::ConnOpener::start()
 {
     Must(conn_ != NULL);
 
-    /* get a socket open ready for connecting with */
+    /* outbound sockets have no need to be protocol agnostic. */
+    if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) {
+        conn_->local.SetIPv4();
+    }
+
+    if (createFd())
+        connect();
+}
+
+/// called at the end of Comm::ConnOpener::DelayedConnectRetry event
+void
+Comm::ConnOpener::restart() {
+    debugs(5, 5, conn_ << " restarting after sleep");
+    calls_.sleep_ = false;
+
+    if (createFd())
+        connect();
+}
+
+/// Create a socket for the future connection or return false. 
+/// If false is returned, done() is guaranteed to return true and end the job.
+bool
+Comm::ConnOpener::createFd()
+{
+    Must(temporaryFd_ < 0);
+
+    // our initators signal abort by cancelling their callbacks
+    if (callback_ == NULL || callback_->canceled())
+        return false;
+
+    temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_);
     if (temporaryFd_ < 0) {
-        /* outbound sockets have no need to be protocol agnostic. */
-        if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) {
-            conn_->local.SetIPv4();
-        }
-        temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_);
-        if (temporaryFd_ < 0) {
-            doneConnecting(COMM_ERR_CONNECT, 0);
-            return;
-        }
+        sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::createFd");
+        return false;
     }
 
     typedef CommCbMemFunT<Comm::ConnOpener, CommCloseCbParams> abortDialer;
     calls_.earlyAbort_ = JobCallback(5, 4, abortDialer, this, Comm::ConnOpener::earlyAbort);
     comm_add_close_handler(temporaryFd_, calls_.earlyAbort_);
 
     typedef CommCbMemFunT<Comm::ConnOpener, CommTimeoutCbParams> timeoutDialer;
     calls_.timeout_ = JobCallback(5, 4, timeoutDialer, this, Comm::ConnOpener::timeout);
-    debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_);
+    debugs(5, 3, conn_ << " will timeout in " << (deadline_ - squid_curtime));
 
-    // Update the fd_table directly because conn_ is not yet storing the FD
+    // Update the fd_table directly because commSetConnTimeout() needs open conn_
     assert(temporaryFd_ < Squid_MaxFD);
     assert(fd_table[temporaryFd_].flags.open);
     typedef CommTimeoutCbParams Params;
     Params &params = GetCommParams<Params>(calls_.timeout_);
     params.conn = conn_;
     fd_table[temporaryFd_].timeoutHandler = calls_.timeout_;
-    fd_table[temporaryFd_].timeout = squid_curtime + (time_t) connectTimeout_;
+    fd_table[temporaryFd_].timeout = deadline_;
 
-    connectStart_ = squid_curtime;
-    connect();
+    return true;
 }
 
 void
 Comm::ConnOpener::connected()
 {
-    conn_->fd = temporaryFd_;
-    temporaryFd_ = -1;
+    Must(temporaryFd_ >= 0);
+    keepFd();
 
     /*
      * stats.conn_open is used to account for the number of
      * connections that we have open to the CachePeer, so we can limit
      * based on the max-conn option.  We need to increment here,
      * even if the connection may fail.
      */
     if (CachePeer *peer=(conn_->getPeer()))
         ++peer->stats.conn_open;
 
     lookupLocalAddress();
 
     /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to
      *       indicate the state of a raw fd object being passed around.
      *       Also, legacy code still depends on comm_local_port() with no access to Comm::Connection
      *       when those are done comm_local_port can become one of our member functions to do the below.
      */
-    fd_table[conn_->fd].flags.open = 1;
+    Must(fd_table[conn_->fd].flags.open);
     fd_table[conn_->fd].local_addr = conn_->local;
+
+    sendAnswer(COMM_OK, 0, "Comm::ConnOpener::connected");
 }
 
-/** Make an FD connection attempt.
- * Handles the case(s) when a partially setup connection gets closed early.
- */
+/// Make an FD connection attempt.
 void
 Comm::ConnOpener::connect()
 {
     Must(conn_ != NULL);
-
-    // our parent Jobs signal abort by cancelling their callbacks.
-    if (callback_ == NULL || callback_->canceled())
-        return;
+    Must(temporaryFd_ >= 0);
 
     ++ totalTries_;
 
     switch (comm_connect_addr(temporaryFd_, conn_->remote) ) {
 
     case COMM_INPROGRESS:
-        // check for timeout FIRST.
-        if (squid_curtime - connectStart_ > connectTimeout_) {
-            debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
-            calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out");
-            doneConnecting(COMM_TIMEOUT, errno);
-            return;
-        } else {
-            debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
-            Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0);
-        }
+        debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
+        Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0);
         break;
 
     case COMM_OK:
         debugs(5, 5, HERE << conn_ << ": COMM_OK - connected");
         connected();
-        doneConnecting(COMM_OK, 0);
         break;
 
-    default:
+    default: {
+        const int xerrno = errno;
+
         ++failRetries_;
+        debugs(5, 7, conn_ << ": failure #" << failRetries_ << " <= " <<
+               Config.connect_retries << ": " << xstrerr(xerrno));
 
-        // check for timeout FIRST.
-        if (squid_curtime - connectStart_ > connectTimeout_) {
-            debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response.");
-            calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out");
-            doneConnecting(COMM_TIMEOUT, errno);
-        } else if (failRetries_ < Config.connect_retries) {
+        if (failRetries_ < Config.connect_retries) {
             debugs(5, 5, HERE << conn_ << ": * - try again");
-            eventAdd("Comm::ConnOpener::DelayedConnectRetry", Comm::ConnOpener::DelayedConnectRetry, new Pointer(this), 0.05, 0, false);
+            sleep();
             return;
         } else {
             // send ERROR back to the upper layer.
             debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already.");
-            calls_.earlyAbort_->cancel("Comm::ConnOpener::connect failed");
-            doneConnecting(COMM_ERR_CONNECT, errno);
+            sendAnswer(COMM_ERR_CONNECT, xerrno, "Comm::ConnOpener::connect");
         }
     }
+    }
+}
+
+/// Close and wait a little before trying to open and connect again.
+void
+Comm::ConnOpener::sleep() {
+    Must(!calls_.sleep_);
+    closeFd();
+    calls_.sleep_ = true;
+    eventAdd("Comm::ConnOpener::DelayedConnectRetry",
+             Comm::ConnOpener::DelayedConnectRetry,
+             new Pointer(this), 0.05, 0, false);
+}
+
+/// cleans up this job sleep state
+void
+Comm::ConnOpener::cancelSleep()
+{
+    if (calls_.sleep_) {
+       // It would be nice to delete the sleep event, but it might be out of
+       // the event queue and in the async queue already, so (a) we do not know
+       // whether we can safely delete the call ptr here and (b) eventDelete()
+       // will assert if the event went async. Thus, we let the event run so
+       // that it deletes the call ptr [after this job is gone]. Note that we
+       // are called only when the job ends so this "hanging event" will do
+       // nothing but deleting the call ptr.  TODO: Revise eventDelete() API.
+       // eventDelete(Comm::ConnOpener::DelayedConnectRetry, calls_.sleep);
+       calls_.sleep_ = false;
+       debugs(5, 9, conn_ << " stops sleeping");
+    }
 }
 
 /**
  * Lookup local-end address and port of the TCP link just opened.
  * This ensure the connection local details are set correctly
  */
 void
 Comm::ConnOpener::lookupLocalAddress()
 {
     struct addrinfo *addr = NULL;
     conn_->local.InitAddrInfo(addr);
 
     if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) {
         debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror());
         conn_->local.FreeAddrInfo(addr);
         return;
     }
 
     conn_->local = *addr;
     conn_->local.FreeAddrInfo(addr);
     debugs(5, 6, HERE << conn_);
 }
 
 /** Abort connection attempt.
  * Handles the case(s) when a partially setup connection gets closed early.
  */
 void
 Comm::ConnOpener::earlyAbort(const CommCloseCbParams &io)
 {
     debugs(5, 3, HERE << io.conn);
-    doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better?
+    calls_.earlyAbort_ = NULL;
+    // NP: is closing or shutdown better?
+    sendAnswer(COMM_ERR_CLOSING, io.xerrno, "Comm::ConnOpener::earlyAbort");
 }
 
 /**
  * Handles the case(s) when a partially setup connection gets timed out.
  * NP: When commSetConnTimeout accepts generic CommCommonCbParams this can die.
  */
 void
 Comm::ConnOpener::timeout(const CommTimeoutCbParams &)
 {
-    connect();
+    debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response.");
+    calls_.timeout_ = NULL;
+    sendAnswer(COMM_TIMEOUT, ETIMEDOUT, "Comm::ConnOpener::timeout");
 }
 
 /* Legacy Wrapper for the retry event after COMM_INPROGRESS
  * XXX: As soon as Comm::SetSelect() accepts Async calls we can use a ConnOpener::connect call
  */
 void
 Comm::ConnOpener::InProgressConnectRetry(int fd, void *data)
 {
     Pointer *ptr = static_cast<Pointer*>(data);
     assert(ptr);
     if (ConnOpener *cs = ptr->valid()) {
         // Ew. we are now outside the all AsyncJob protections.
         // get back inside by scheduling another call...
         typedef NullaryMemFunT<Comm::ConnOpener> Dialer;
         AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect);
         ScheduleCallHere(call);
     }
     delete ptr;
 }
 
 /* Legacy Wrapper for the retry event with small delay after errors.
- * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::connect call
+ * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::restart call
  */
 void
 Comm::ConnOpener::DelayedConnectRetry(void *data)
 {
     Pointer *ptr = static_cast<Pointer*>(data);
     assert(ptr);
     if (ConnOpener *cs = ptr->valid()) {
         // Ew. we are now outside the all AsyncJob protections.
         // get back inside by scheduling another call...
         typedef NullaryMemFunT<Comm::ConnOpener> Dialer;
-        AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect);
+        AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::restart);
         ScheduleCallHere(call);
     }
     delete ptr;
 }

=== modified file 'src/comm/ConnOpener.h'
--- src/comm/ConnOpener.h	2013-01-02 23:40:49 +0000
+++ src/comm/ConnOpener.h	2013-01-25 17:11:40 +0000
@@ -23,57 +23,64 @@
 public:
     void noteAbort() { mustStop("externally aborted"); }
 
     typedef CbcPointer<ConnOpener> Pointer;
 
     virtual bool doneAll() const;
 
     ConnOpener(Comm::ConnectionPointer &, AsyncCall::Pointer &handler, time_t connect_timeout);
     ~ConnOpener();
 
     void setHost(const char *);    ///< set the hostname note for this connection
     const char * getHost() const;  ///< get the hostname noted for this connection
 
 private:
     // Undefined because two openers cannot share a connection
     ConnOpener(const ConnOpener &);
     ConnOpener & operator =(const ConnOpener &c);
 
     void earlyAbort(const CommCloseCbParams &);
     void timeout(const CommTimeoutCbParams &);
-    void doneConnecting(comm_err_t errFlag, int xerrno);
+    void sendAnswer(comm_err_t errFlag, int xerrno, const char *why);
     static void InProgressConnectRetry(int fd, void *data);
     static void DelayedConnectRetry(void *data);
     void connect();
     void connected();
     void lookupLocalAddress();
 
+    void sleep();
+    void restart();
+
+    bool createFd();
+    void closeFd();
+    void keepFd();
+    void cleanFd();
+
+    void cancelSleep();
+
 private:
     char *host_;                         ///< domain name we are trying to connect to.
     int temporaryFd_;                    ///< the FD being opened. Do NOT set conn_->fd until it is fully open.
     Comm::ConnectionPointer conn_;       ///< single connection currently to be opened.
     AsyncCall::Pointer callback_;        ///< handler to be called on connection completion.
 
     int totalTries_;   ///< total number of connection attempts over all destinations so far.
     int failRetries_;  ///< number of retries current destination has been tried.
 
-    /**
-     * time at which to abandon the connection.
-     * the connection-done callback will be passed COMM_TIMEOUT
-     */
-    time_t connectTimeout_;
-
-    /// time at which this series of connection attempts was started.
-    time_t connectStart_;
+    /// if we are not done by then, we will call back with COMM_TIMEOUT
+    time_t deadline_;
 
     /// handles to calls which we may need to cancel.
     struct Calls {
         AsyncCall::Pointer earlyAbort_;
         AsyncCall::Pointer timeout_;
+        /// Whether we are idling before retrying to connect; not yet a call
+        /// [that we can cancel], but it will probably become one eventually.
+        bool sleep_;
     } calls_;
 
     CBDATA_CLASS2(ConnOpener);
 };
 
 }; // namespace Comm
 
 #endif /* _SQUID_SRC_COMM_CONNOPENER_H */