=== modified file 'src/CacheManager.h' --- src/CacheManager.h 2010-11-04 10:09:05 +0000 +++ src/CacheManager.h 2011-08-13 03:41:21 +0000 @@ -34,6 +34,7 @@ #ifndef SQUID_CACHEMANAGER_H #define SQUID_CACHEMANAGER_H +#include "base/AsyncJob.h" #include "comm/forward.h" #include "mgr/Action.h" #include "mgr/ActionProfile.h" @@ -41,6 +42,11 @@ #include "mgr/forward.h" #include +namespace Ipc +{ + class TypedMsgHdr; +} + /** \defgroup CacheManagerAPI Cache Manager API \ingroup Components @@ -57,7 +63,7 @@ * an instance of this class will represent a single independent manager. * TODO: update documentation to reflect the new singleton model. */ -class CacheManager +class CacheManager : public AsyncJob { public: typedef std::vector Menu; @@ -78,8 +84,12 @@ static CacheManager* GetInstance(); const char *ActionProtection(const Mgr::ActionProfilePointer &profile); + // AsyncJob API + virtual void swanSong(); + virtual bool doneAll() const; + protected: - CacheManager() {} ///< use Instance() instead + CacheManager() : AsyncJob("CacheManager") {} ///< use Instance() instead Mgr::CommandPointer ParseUrl(const char *url); void ParseHeaders(const HttpRequest * request, Mgr::ActionParams ¶ms); @@ -88,8 +98,13 @@ void registerProfile(const Mgr::ActionProfilePointer &profile); + // IPC subscriptions + void handleIpcRequest(const Ipc::TypedMsgHdr &msg); + void handleIpcResponse(const Ipc::TypedMsgHdr &msg); + Menu menu_; + CBDATA_CLASS2(CacheManager); private: static CacheManager* instance; }; === modified file 'src/cache_manager.cc' --- src/cache_manager.cc 2011-07-23 08:37:52 +0000 +++ src/cache_manager.cc 2011-08-13 11:47:45 +0000 @@ -42,6 +42,11 @@ #include "fde.h" #include "HttpReply.h" #include "HttpRequest.h" +#include "ipc/Coordinator.h" +#include "ipc/Messages.h" +#include "ipc/Port.h" +#include "ipc/TypedMsgHdr.h" +#include "ipc/UdsOp.h" #include "mgr/ActionCreator.h" #include "mgr/Action.h" #include "mgr/ActionProfile.h" @@ -49,7 +54,10 @@ #include "mgr/Command.h" #include "mgr/Forwarder.h" #include "mgr/FunAction.h" +#include "mgr/Inquirer.h" #include "mgr/QueryParams.h" +#include "mgr/Request.h" +#include "mgr/Response.h" #include "protos.h" /* rotate_logs() */ #include "SquidTime.h" #include "Store.h" @@ -60,6 +68,8 @@ /// \ingroup CacheManagerInternal #define MGR_PASSWD_SZ 128 +CBDATA_CLASS_INIT(CacheManager); + /// creates Action using supplied Action::Create method and command class ClassActionCreator: public Mgr::ActionCreator { @@ -456,6 +466,20 @@ CacheManager* CacheManager::instance=0; +void +CacheManager::swanSong() +{ + Ipc::ClearListenFor(Ipc::mtCacheMgrRequest); + Ipc::ClearListenFor(Ipc::mtCacheMgrResponse); +} + +bool +CacheManager::doneAll() const +{ + // Once started, only exit on shutdown. + return shutting_down != 0; +} + /** \ingroup CacheManagerAPI * Singleton accessor method. @@ -467,6 +491,54 @@ debugs(16, 6, "CacheManager::GetInstance: starting cachemanager up"); instance = new CacheManager; Mgr::RegisterBasics(); + + // NP: no need to hold onto these. Since we un-register using the message type. + typedef UnaryMemFunT MgrDialer; + typedef AsyncCallT IpcMsgCall; + Subscription::Pointer sub; + RefCount call; + + // Start accepting Cache Manager Responses + call = static_cast(asyncCall(16, 3, "CacheManager::handleIpcResponse", + MgrDialer(instance, &CacheManager::handleIpcResponse, Ipc::TypedMsgHdr()))); + sub = new CallSubscription(call); + Ipc::RegisterListenFor(Ipc::mtCacheMgrResponse, sub); + debugs(16, 1, "Startup: CacheManager: Accepting manager IPC responses"); + + // Start accepting Cache Manager Requests + call = static_cast(asyncCall(16, 3, "CacheManager::handleIpcRequest", + MgrDialer(instance, &CacheManager::handleIpcRequest, Ipc::TypedMsgHdr()))); + sub = new CallSubscription(call); + Ipc::RegisterListenFor(Ipc::mtCacheMgrRequest, sub); + + debugs(16, 1, "Startup: CacheManager: Accepting manager IPC requests"); } return instance; } + +void +CacheManager::handleIpcRequest(const Ipc::TypedMsgHdr &msg) +{ + debugs(16, 4, HERE); + const Mgr::Request request(msg); + + // Let the strand know that we are now responsible for handling the request + Mgr::Response response(request.requestId); + Ipc::TypedMsgHdr message; + response.pack(message); + Ipc::SendMessage(Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId), message); + + // XXX: Just run the action? we are inside an async manager call already. + Mgr::Action::Pointer action = createRequestedAction(request.params); + if (Ipc::Coordinator::Instance()) + AsyncJob::Start(new Mgr::Inquirer(action, request, Ipc::Coordinator::Instance()->strands())); + // else throw? coordinator scheduled a call and abandoned us. +} + +void +CacheManager::handleIpcResponse(const Ipc::TypedMsgHdr &msg) +{ + debugs(16, 4, HERE); + const Mgr::Response response(msg); + Mgr::Inquirer::HandleRemoteAck(response); +} === modified file 'src/ipc/Coordinator.cc' --- src/ipc/Coordinator.cc 2011-07-20 12:38:39 +0000 +++ src/ipc/Coordinator.cc 2011-08-13 07:21:31 +0000 @@ -9,14 +9,10 @@ #include "config.h" #include "base/Subscription.h" #include "base/TextException.h" -#include "CacheManager.h" #include "comm.h" #include "comm/Connection.h" #include "ipc/Coordinator.h" #include "ipc/SharedListen.h" -#include "mgr/Inquirer.h" -#include "mgr/Request.h" -#include "mgr/Response.h" #if SQUID_SNMP #include "snmp/Inquirer.h" #include "snmp/Request.h" @@ -68,20 +74,6 @@ handleSharedListenRequest(SharedListenRequest(message)); break; - case mtCacheMgrRequest: { - debugs(54, 6, HERE << "Cache manager request"); - const Mgr::Request req(message); - handleCacheMgrRequest(req); - } - break; - - case mtCacheMgrResponse: { - debugs(54, 6, HERE << "Cache manager response"); - const Mgr::Response resp(message); - handleCacheMgrResponse(resp); - } - break; - #if SQUID_SNMP case mtSnmpRequest: { debugs(54, 6, HERE << "SNMP request"); @@ -134,28 +126,6 @@ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); } -void -Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request) -{ - debugs(54, 4, HERE); - - // Let the strand know that we are now responsible for handling the request - Mgr::Response response(request.requestId); - TypedMsgHdr message; - response.pack(message); - SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); - - Mgr::Action::Pointer action = - CacheManager::GetInstance()->createRequestedAction(request.params); - AsyncJob::Start(new Mgr::Inquirer(action, request, strands_)); -} - -void -Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response) -{ - Mgr::Inquirer::HandleRemoteAck(response); -} - #if SQUID_SNMP void Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request) === modified file 'src/mgr/Request.cc' --- src/mgr/Request.cc 2011-05-13 08:13:01 +0000 +++ src/mgr/Request.cc 2011-08-11 17:27:32 +0000 @@ -32,6 +32,12 @@ Mgr::Request::Request(const Ipc::TypedMsgHdr& msg): Ipc::Request(0, 0) { + *this = msg; +} + +Mgr::Request& +Mgr::Request::operator =(const Ipc::TypedMsgHdr& msg) +{ msg.checkType(Ipc::mtCacheMgrRequest); msg.getPod(requestorId); msg.getPod(requestId); @@ -41,6 +47,7 @@ conn->fd = msg.getFd(); // For now we just have the FD. // Address and connectio details wil be pulled/imported by the component later + return *this; } void === modified file 'src/mgr/Request.h' --- src/mgr/Request.h 2011-05-13 08:13:01 +0000 +++ src/mgr/Request.h 2011-08-11 17:10:11 +0000 @@ -24,6 +24,8 @@ const ActionParams &aParams); explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() + Request & operator =(const Ipc::TypedMsgHdr& msg); + /* Ipc::Request API */ virtual void pack(Ipc::TypedMsgHdr& msg) const; virtual Pointer clone() const;