Replace blocking sleep(3) and close UDS socket on failures. The two addressed XXX were not causing any serious bugs on their own, but the blocking sleep was ugly and possibly in the way of further kid registration fixes/improvements. === modified file 'src/ipc/UdsOp.cc' --- src/ipc/UdsOp.cc 2013-06-03 14:05:16 +0000 +++ src/ipc/UdsOp.cc 2013-11-06 23:52:01 +0000 @@ -64,75 +64,131 @@ void Ipc::UdsOp::noteTimeout(const CommT timedout(); // our kid handles communication timeout } struct sockaddr_un Ipc::PathToAddress(const String& pathAddr) { assert(pathAddr.size() != 0); struct sockaddr_un unixAddr; memset(&unixAddr, 0, sizeof(unixAddr)); unixAddr.sun_family = AF_LOCAL; xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path)); return unixAddr; } CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender); Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage): UdsOp(pathAddr), message(aMessage), retries(10), // TODO: make configurable? timeout(10), // TODO: make configurable? + sleeping(false), writing(false) { message.address(address); } +void Ipc::UdsSender::swanSong() +{ + // did we abort while waiting between retries? + if (sleeping) + cancelSleep(); + + UdsOp::swanSong(); +} + + void Ipc::UdsSender::start() { UdsOp::start(); write(); if (timeout > 0) setTimeout(timeout, "Ipc::UdsSender::noteTimeout"); } bool Ipc::UdsSender::doneAll() const { - return !writing && UdsOp::doneAll(); + return !writing && !sleeping && UdsOp::doneAll(); } void Ipc::UdsSender::write() { debugs(54, 5, HERE); typedef CommCbMemFunT Dialer; AsyncCall::Pointer writeHandler = JobCallback(54, 5, Dialer, this, UdsSender::wrote); Comm::Write(conn(), message.raw(), message.size(), writeHandler, NULL); writing = true; } void Ipc::UdsSender::wrote(const CommIoCbParams& params) { debugs(54, 5, HERE << params.conn << " flag " << params.flag << " retries " << retries << " [" << this << ']'); writing = false; if (params.flag != COMM_OK && retries-- > 0) { - sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed? - write(); // XXX: should we close on error so that conn() reopens? + // perhaps a fresh connection and more time will help? + conn()->close(); + sleep(); + } +} + +/// pause for a while before resending the message +void Ipc::UdsSender::sleep() +{ + Must(!sleeping); + sleeping = true; + eventAdd("Ipc::UdsSender::DelayedRetry", + Ipc::UdsSender::DelayedRetry, + new Pointer(this), 1, 0, false); // TODO: Use Fibonacci increments +} + +/// stop sleeping (or do nothing if we were not) +void Ipc::UdsSender::cancelSleep() +{ + if (sleeping) { + // Why not delete the event? See Comm::ConnOpener::cancelSleep(). + sleeping = false; + debugs(54, 9, "stops sleeping"); + } +} + +/// legacy wrapper for Ipc::UdsSender::delayedRetry() +void Ipc::UdsSender::DelayedRetry(void *data) +{ + Pointer *ptr = static_cast(data); + assert(ptr); + if (UdsSender *us = dynamic_cast(ptr->valid())) { + // get back inside AsyncJob protection by scheduling an async job call + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = JobCallback(54, 4, Dialer, us, Ipc::UdsSender::delayedRetry); + ScheduleCallHere(call); + } + delete ptr; +} + +/// make another sending attempt after a pause +void Ipc::UdsSender::delayedRetry() +{ + debugs(54, 5, HERE << sleeping); + if (sleeping) { + sleeping = false; + write(); // reopens the connection if needed } } void Ipc::UdsSender::timedout() { debugs(54, 5, HERE); mustStop("timedout"); } void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message) { AsyncJob::Start(new UdsSender(toAddress, message)); } const Comm::ConnectionPointer & Ipc::ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, Ipc::FdNoteId noteId) { struct sockaddr_in addr; socklen_t len = sizeof(addr); if (getsockname(conn->fd, reinterpret_cast(&addr), &len) == 0) { === modified file 'src/ipc/UdsOp.h' --- src/ipc/UdsOp.h 2012-10-04 09:14:06 +0000 +++ src/ipc/UdsOp.h 2013-11-06 22:29:36 +0000 @@ -48,48 +48,55 @@ private: private: int options; ///< UDS options Comm::ConnectionPointer conn_; ///< UDS descriptor private: UdsOp(const UdsOp &); // not implemented UdsOp &operator= (const UdsOp &); // not implemented }; /// converts human-readable filename path into UDS address struct sockaddr_un PathToAddress(const String &pathAddr); // XXX: move UdsSender code to UdsSender.{cc,h} /// attempts to send an IPC message a few times, with a timeout class UdsSender: public UdsOp { public: UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage); protected: + virtual void swanSong(); // UdsOp (AsyncJob) API virtual void start(); // UdsOp (AsyncJob) API virtual bool doneAll() const; // UdsOp (AsyncJob) API virtual void timedout(); // UdsOp API private: + void sleep(); + void cancelSleep(); + static void DelayedRetry(void *data); + void delayedRetry(); + void write(); ///< schedule writing void wrote(const CommIoCbParams& params); ///< done writing or error private: TypedMsgHdr message; ///< what to send int retries; ///< how many times to try after a write error int timeout; ///< total time to send the message + bool sleeping; ///< whether we are waiting to retry a failed write bool writing; ///< whether Comm started and did not finish writing private: UdsSender(const UdsSender&); // not implemented UdsSender& operator= (const UdsSender&); // not implemented CBDATA_CLASS2(UdsSender); }; void SendMessage(const String& toAddress, const TypedMsgHdr& message); /// import socket fd from another strand into our Comm state const Comm::ConnectionPointer & ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, FdNoteId noteId); } #endif /* SQUID_IPC_ASYNCUDSOP_H */