=== modified file 'src/ipc/Coordinator.cc' --- src/ipc/Coordinator.cc 2010-07-06 23:09:44 +0000 +++ src/ipc/Coordinator.cc 2010-09-24 09:29:17 +0000 @@ -7,7 +7,9 @@ #include "config.h" +#include "base/Subscription.h" #include "comm.h" +#include "comm/Connection.h" #include "ipc/Coordinator.h" #include "ipc/FdNotes.h" #include "ipc/SharedListen.h" @@ -81,20 +83,20 @@ " needs shared listen FD for " << request.params.addr); Listeners::const_iterator i = listeners.find(request.params); int errNo = 0; - const int sock = (i != listeners.end()) ? + const Comm::ConnectionPointer c = (i != listeners.end()) ? i->second : openListenSocket(request, errNo); - debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " << + debugs(54, 3, HERE << "sending shared listen " << c << " for " << request.params.addr << " to kid" << request.requestorId << " mapId=" << request.mapId); - SharedListenResponse response(sock, errNo, request.mapId); + SharedListenResponse response(c, errNo, request.mapId); TypedMsgHdr message; response.pack(message); SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); } -int +Comm::ConnectionPointer Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, int &errNo) { @@ -103,19 +105,23 @@ debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" << request.requestorId); - Ip::Address addr = p.addr; // comm_open_listener may modify it + Comm::ConnectionPointer conn = new Comm::Connection; + conn->local = p.addr; // comm_open_listener may modify it + conn->flags = p.flags; enter_suid(); - const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags, - FdNote(p.fdNote)); - errNo = (sock >= 0) ? 0 : errno; + comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote)); + errNo = Comm::IsConnOpen(conn) ? 0 : errno; leave_suid(); + debugs(54, 6, HERE << "tried listening on " << conn << " for kid" << + request.requestorId); + // cache positive results - if (sock >= 0) - listeners[request.params] = sock; + if (Comm::IsConnOpen(conn)) + listeners[request.params] = conn; - return sock; + return conn; } void Ipc::Coordinator::broadcastSignal(int sig) const === modified file 'src/ipc/Coordinator.h' --- src/ipc/Coordinator.h 2010-07-05 21:28:23 +0000 +++ src/ipc/Coordinator.h 2010-09-24 07:09:11 +0000 @@ -41,13 +41,13 @@ void handleSharedListenRequest(const SharedListenRequest& request); /// calls comm_open_listener() - int openListenSocket(const SharedListenRequest& request, int &errNo); + Comm::ConnectionPointer openListenSocket(const SharedListenRequest& request, int &errNo); private: typedef Vector Strands; ///< unsorted strands Strands strands; ///< registered processes and threads - typedef std::map Listeners; ///< params:fd map + typedef std::map Listeners; ///< params:fd map Listeners listeners; ///< cached comm_open_listener() results static Coordinator* TheInstance; ///< the only class instance in existence === modified file 'src/ipc/Port.cc' --- src/ipc/Port.cc 2010-08-24 00:12:54 +0000 +++ src/ipc/Port.cc 2010-10-02 10:16:26 +0000 @@ -5,9 +5,10 @@ * */ - #include "config.h" +#include "comm.h" #include "CommCalls.h" +#include "comm/Connection.h" #include "ipc/Port.h" const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc"; @@ -33,7 +34,7 @@ typedef CommCbMemFunT Dialer; AsyncCall::Pointer readHandler = JobCallback(54, 6, Dialer, this, Port::noteRead); - comm_read(fd(), buf.raw(), buf.size(), readHandler); + comm_read(conn(), buf.raw(), buf.size(), readHandler); } bool Ipc::Port::doneAll() const @@ -53,7 +54,7 @@ void Ipc::Port::noteRead(const CommIoCbParams& params) { - debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << + debugs(54, 6, HERE << params.conn << " flag " << params.flag << " [" << this << ']'); if (params.flag == COMM_OK) { assert(params.buf == buf.raw()); === modified file 'src/ipc/SharedListen.cc' --- src/ipc/SharedListen.cc 2010-07-06 23:09:44 +0000 +++ src/ipc/SharedListen.cc 2010-09-24 09:28:25 +0000 @@ -6,9 +6,9 @@ */ #include "config.h" -#include +#include "base/TextException.h" #include "comm.h" -#include "base/TextException.h" +#include "comm/Connection.h" #include "ipc/Port.h" #include "ipc/Messages.h" #include "ipc/Kids.h" @@ -16,6 +16,7 @@ #include "ipc/StartListening.h" #include "ipc/SharedListen.h" +#include /// holds information necessary to handle JoinListen response class PendingOpenRequest @@ -80,22 +81,24 @@ } -Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId): - fd(aFd), errNo(anErrNo), mapId(aMapId) +Ipc::SharedListenResponse::SharedListenResponse(const Comm::ConnectionPointer &c, int anErrNo, int aMapId): + conn(c), errNo(anErrNo), mapId(aMapId) { } Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg): - fd(-1), errNo(0), mapId(-1) + conn(NULL), errNo(0), mapId(-1) { hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this)); - fd = hdrMsg.getFd(); + conn = new Comm::Connection; + conn->fd = hdrMsg.getFd(); + // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin() } void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const { hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this)); - hdrMsg.putFd(fd); + hdrMsg.putFd(conn->fd); } @@ -121,9 +124,10 @@ void Ipc::SharedListenJoined(const SharedListenResponse &response) { - const int fd = response.fd; + Comm::ConnectionPointer c = response.conn; - debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" << + // Dont debugs c fully since only FD is filled right now. + debugs(54, 3, HERE << "got listening FD " << c->fd << " errNo=" << response.errNo << " mapId=" << response.mapId); Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end()); @@ -131,20 +135,23 @@ Must(por.callback != NULL); TheSharedListenRequestMap.erase(response.mapId); - if (fd >= 0) { + if (Comm::IsConnOpen(c)) { OpenListenerParams &p = por.params; + c->local = p.addr; + c->flags = p.flags; + // XXX: leave the comm AI stuff to comm_import_opened()? struct addrinfo *AI = NULL; p.addr.GetAddrInfo(AI); AI->ai_socktype = p.sock_type; AI->ai_protocol = p.proto; - comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI); + comm_import_opened(c, FdNote(p.fdNote), AI); p.addr.FreeAddrInfo(AI); } - StartListeningCb *cbd = - dynamic_cast(por.callback->getDialer()); + StartListeningCb *cbd = dynamic_cast(por.callback->getDialer()); Must(cbd); - cbd->fd = fd; + cbd->conn = c; cbd->errNo = response.errNo; + cbd->handlerSubscription = por.params.handlerSubscription; ScheduleCallHere(por.callback); } === modified file 'src/ipc/SharedListen.h' --- src/ipc/SharedListen.h 2010-07-06 23:09:44 +0000 +++ src/ipc/SharedListen.h 2010-09-24 09:23:04 +0000 @@ -15,7 +15,8 @@ /// "shared listen" is when concurrent processes are listening on the same fd -/// comm_open_listener() parameters holder +/// Comm::ConnAcceptor parameters holder +/// all the details necessary to recreate a Comm::Connection and fde entry for the kid listener FD class OpenListenerParams { public: @@ -23,11 +24,17 @@ bool operator <(const OpenListenerParams &p) const; ///< useful for map<> + // bits to re-create the fde entry int sock_type; int proto; + int fdNote; ///< index into fd_note() comment strings + + // bits to re-create the listener Comm::Connection descriptor Ip::Address addr; ///< will be memset and memcopied int flags; - int fdNote; ///< index into fd_note() comment strings + + /// handler to subscribe to Comm::ConnAcceptor when we get the response + Subscription::Pointer handlerSubscription; }; class TypedMsgHdr; @@ -52,12 +59,12 @@ class SharedListenResponse { public: - SharedListenResponse(int fd, int errNo, int mapId); + SharedListenResponse(const Comm::ConnectionPointer &c, int errNo, int mapId); explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg() void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() public: - int fd; ///< opened listening socket or -1 + Comm::ConnectionPointer conn; ///< opened listening socket or -1 int errNo; ///< errno value from comm_open_sharedListen() call int mapId; ///< to map future response to the requestor's callback }; === modified file 'src/ipc/StartListening.cc' --- src/ipc/StartListening.cc 2010-07-06 23:09:44 +0000 +++ src/ipc/StartListening.cc 2010-10-02 02:23:20 +0000 @@ -6,13 +6,16 @@ */ #include "config.h" +#include "base/Subscription.h" +#include "base/TextException.h" #include "comm.h" -#include "base/TextException.h" +#include "comm/ConnAcceptor.h" +#include "comm/Connection.h" #include "ipc/SharedListen.h" #include "ipc/StartListening.h" -Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0) +Ipc::StartListeningCb::StartListeningCb(): conn(NULL), errNo(0) { } @@ -22,37 +25,44 @@ std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const { - return os << "(FD " << fd << ", err=" << errNo; + return os << "(" << conn << ", err=" << errNo; } - -void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, - int flags, FdNoteId fdNote, AsyncCall::Pointer &callback) +void +Ipc::StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn, + FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub) { - OpenListenerParams p; - p.sock_type = sock_type; - p.proto = proto; - p.addr = addr; - p.flags = flags; - p.fdNote = fdNote; - if (UsingSmp()) { // if SMP is on, share + OpenListenerParams p; + p.sock_type = sock_type; + p.proto = proto; + p.addr = listenConn->local; + p.flags = listenConn->flags; + p.fdNote = fdNote; + p.handlerSubscription = sub; + Ipc::JoinSharedListen(p, callback); return; // wait for the call back } + StartListeningCb *cbd = dynamic_cast(callback->getDialer()); + Must(cbd); + cbd->conn = listenConn; + enter_suid(); - const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags, - FdNote(p.fdNote)); - const int errNo = (sock >= 0) ? 0 : errno; + if (sock_type == SOCK_STREAM) { + // TCP: setup the subscriptions such that new connections accepted by listenConn are handled by HTTP + AsyncJob::Start(new Comm::ConnAcceptor(cbd->conn, FdNote(fdNote), sub)); + } else if (sock_type == SOCK_DGRAM) { + // UDP: setup the listener socket, but do not set a subscriber + Comm::ConnectionPointer udpConn = listenConn; + comm_open_listener(sock_type, proto, udpConn, FdNote(fdNote)); + } else { + fatalf("Invalid Socket Type (%d)",sock_type); + } + cbd->errNo = cbd->conn->isOpen() ? 0 : errno; leave_suid(); - debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr); - - StartListeningCb *cbd = - dynamic_cast(callback->getDialer()); - Must(cbd); - cbd->fd = sock; - cbd->errNo = errNo; + debugs(54, 3, HERE << "opened listen " << cbd->conn); ScheduleCallHere(callback); } === modified file 'src/ipc/StartListening.h' --- src/ipc/StartListening.h 2010-07-25 08:10:12 +0000 +++ src/ipc/StartListening.h 2010-09-24 09:06:49 +0000 @@ -9,9 +9,11 @@ #define SQUID_IPC_START_LISTENING_H #include "config.h" +#include "base/AsyncCall.h" +#include "base/Subscription.h" +#include "comm/forward.h" #include "ip/forward.h" #include "ipc/FdNotes.h" -#include "base/AsyncCall.h" #if HAVE_IOSFWD #include @@ -31,14 +33,15 @@ std::ostream &startPrint(std::ostream &os) const; public: - int fd; ///< opened listening socket or -1 + Comm::ConnectionPointer conn; ///< opened listening socket int errNo; ///< errno value from the comm_open_listener() call + Subscription::Pointer handlerSubscription; ///< The subscription we will pass on to the ConnAcceptor }; /// Depending on whether SMP is on, either ask Coordinator to send us -/// the listening FD or call comm_open_listener() directly. -extern void StartListening(int sock_type, int proto, Ip::Address &addr, - int flags, FdNoteId fdNote, AsyncCall::Pointer &callback); +/// the listening FD or start a connection acceptor directly. +extern void StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn, + FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &handlerSub); } // namespace Ipc; === modified file 'src/ipc/Strand.cc' --- src/ipc/Strand.cc 2010-07-06 23:09:44 +0000 +++ src/ipc/Strand.cc 2010-09-24 09:29:47 +0000 @@ -6,13 +6,14 @@ */ #include "config.h" +#include "base/Subscription.h" #include "base/TextException.h" +#include "comm/Connection.h" #include "ipc/Strand.h" #include "ipc/Messages.h" #include "ipc/SharedListen.h" #include "ipc/Kids.h" - CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand); === modified file 'src/ipc/UdsOp.cc' --- src/ipc/UdsOp.cc 2010-08-24 00:12:54 +0000 +++ src/ipc/UdsOp.cc 2010-09-24 07:10:17 +0000 @@ -4,20 +4,18 @@ * DEBUG: section 54 Interprocess Communication * */ - - #include "config.h" +#include "base/TextException.h" #include "comm.h" #include "CommCalls.h" -#include "base/TextException.h" +#include "comm/Connection.h" #include "ipc/UdsOp.h" Ipc::UdsOp::UdsOp(const String& pathAddr): AsyncJob("Ipc::UdsOp"), address(PathToAddress(pathAddr)), - options(COMM_NONBLOCKING), - fd_(-1) + options(COMM_NONBLOCKING) { debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr); } @@ -25,8 +23,9 @@ Ipc::UdsOp::~UdsOp() { debugs(54, 5, HERE << '[' << this << ']'); - if (fd_ >= 0) - comm_close(fd_); + if (Comm::IsConnOpen(conn_)) + conn_->close(); + conn_ = NULL; } void Ipc::UdsOp::setOptions(int newOptions) @@ -34,15 +33,18 @@ options = newOptions; } -int Ipc::UdsOp::fd() +Comm::ConnectionPointer & +Ipc::UdsOp::conn() { - if (fd_ < 0) { + if (!Comm::IsConnOpen(conn_)) { if (options & COMM_DOBIND) unlink(address.sun_path); - fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options); - Must(fd_ >= 0); + if (conn_ == NULL) + conn_ = new Comm::Connection; + conn_->fd = comm_open_uds(SOCK_DGRAM, 0, &address, options); + Must(Comm::IsConnOpen(conn_)); } - return fd_; + return conn_; } void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName) @@ -50,12 +52,12 @@ typedef CommCbMemFunT Dialer; AsyncCall::Pointer handler = asyncCall(54,5, handlerName, Dialer(CbcPointer(this), &UdsOp::noteTimeout)); - commSetTimeout(fd(), seconds, handler); + commSetTimeout(conn()->fd, seconds, handler); } void Ipc::UdsOp::clearTimeout() { - commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd) + commSetTimeout(conn()->fd, -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd) } void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &) @@ -106,17 +108,17 @@ typedef CommCbMemFunT Dialer; AsyncCall::Pointer writeHandler = JobCallback(54, 5, Dialer, this, UdsSender::wrote); - comm_write(fd(), message.raw(), message.size(), writeHandler); + comm_write(conn(), message.raw(), message.size(), writeHandler); writing = true; } void Ipc::UdsSender::wrote(const CommIoCbParams& params) { - debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']'); + debugs(54, 5, HERE << params.conn << " flag " << params.flag << " [" << this << ']'); writing = false; if (params.flag != COMM_OK && retries-- > 0) { sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed? - write(); // XXX: should we close on error so that fd() reopens? + write(); // XXX: should we close on error so that conn() reopens? } } === modified file 'src/ipc/UdsOp.h' --- src/ipc/UdsOp.h 2010-07-06 18:58:38 +0000 +++ src/ipc/UdsOp.h 2010-09-20 10:07:01 +0000 @@ -11,6 +11,7 @@ #include "SquidString.h" #include "base/AsyncJob.h" +#include "comm/forward.h" #include "ipc/TypedMsgHdr.h" class CommTimeoutCbParams; @@ -33,7 +34,7 @@ protected: virtual void timedout() {} ///< called after setTimeout() if timed out - int fd(); ///< creates if needed and returns raw UDS socket descriptor + Comm::ConnectionPointer &conn(); ///< creates if needed and returns raw UDS socket descriptor /// call timedout() if no UDS messages in a given number of seconds void setTimeout(int seconds, const char *handlerName); @@ -47,7 +48,7 @@ private: int options; ///< UDS options - int fd_; ///< UDS descriptor + Comm::ConnectionPointer conn_; ///< UDS descriptor private: UdsOp(const UdsOp &); // not implemented