=== modified file 'src/ipc/Coordinator.cc' --- src/ipc/Coordinator.cc 2011-07-20 12:38:39 +0000 +++ src/ipc/Coordinator.cc 2011-08-13 07:21:31 +0000 @@ -57,6 +53,16 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) { + if (MessageSubscriptions[message.type()] != NULL) { + typedef UnaryMemFunT Dialer; + AsyncCall::Pointer call = MessageSubscriptions[message.type()]->callback(); + Dialer *d = dynamic_cast(call->getDialer()); + assert(d); + d->arg1 = message; + ScheduleCallHere(call); + return; + } + switch (message.type()) { case mtRegistration: debugs(54, 6, HERE << "Registration request"); === modified file 'src/ipc/Makefile.am' --- src/ipc/Makefile.am 2011-07-20 09:24:15 +0000 +++ src/ipc/Makefile.am 2011-08-13 03:58:29 +0000 @@ -10,6 +10,7 @@ Kid.h \ Kids.cc \ Kids.h \ + Messages.cc \ Messages.h \ StartListening.cc \ StartListening.h \ === added file 'src/ipc/Messages.cc' --- src/ipc/Messages.cc 1970-01-01 00:00:00 +0000 +++ src/ipc/Messages.cc 2011-08-13 07:33:51 +0000 @@ -0,0 +1,4 @@ +#include "config.h" +#include "ipc/Messages.h" + +Subscription::Pointer Ipc::MessageSubscriptions[Ipc::mtLastEntry]; === modified file 'src/ipc/Messages.h' --- src/ipc/Messages.h 2011-02-06 19:50:52 +0000 +++ src/ipc/Messages.h 2011-08-13 10:13:19 +0000 @@ -8,22 +8,52 @@ #ifndef SQUID_IPC_MESSAGES_H #define SQUID_IPC_MESSAGES_H -/** Declarations used by various IPC messages */ +//#include "base/AsyncJobCalls.h" +#include "base/Subscription.h" + +/* + * Messaging and Message Subscriptions. + * + * Subscribers must create a Dialer and a AsyncCall to receieve TypedMsgHdr parameters + * then generate a new CallSubscription(AsyncCall). + * then use RegisterListenFor(message type, subscription) to start receiving IPC messages + */ namespace Ipc { /// message class identifier -typedef enum { mtNone = 0, mtRegistration, +typedef enum { mtNone = 0, + + // IPC worker registration messages + mtRegistration, mtSharedListenRequest, mtSharedListenResponse, - mtCacheMgrRequest, mtCacheMgrResponse -#if SQUID_SNMP - , - mtSnmpRequest, mtSnmpResponse -#endif + + // Cache Manager aand SNMP agent messages + mtCacheMgrRequest, mtCacheMgrResponse, + mtSnmpRequest, mtSnmpResponse, + + mtLastEntry // dummy (end of list) } MessageType; +/// Currently subscribed listeners +extern Subscription::Pointer MessageSubscriptions[mtLastEntry]; + +/// subscribe a callback to happen whenever a certain message class arrives +inline void RegisterListenFor(MessageType m, const Subscription::Pointer &callSub) +{ + assert(MessageSubscriptions[m] == NULL); + MessageSubscriptions[m] = callSub; +} + +/// stop listening for message of the given type. +/// already scheduled events may exist and will happen +/// but future messages will be dropped. +inline void ClearListenFor(MessageType m) +{ + MessageSubscriptions[m] = NULL; +} + } // namespace Ipc; - #endif /* SQUID_IPC_MESSAGES_H */ === modified file 'src/ipc/TypedMsgHdr.h' --- src/ipc/TypedMsgHdr.h 2010-11-21 04:40:05 +0000 +++ src/ipc/TypedMsgHdr.h 2011-08-13 03:19:17 +0000 @@ -8,6 +8,9 @@ #ifndef SQUID_IPC_TYPED_MSG_HDR_H #define SQUID_IPC_TYPED_MSG_HDR_H +#if HAVE_OSTREAM +#include +#endif #if HAVE_SYS_SOCKET_H #include #endif @@ -99,4 +102,10 @@ } // namespace Ipc +inline std::ostream & +operator <<(std::ostream &o, const Ipc::TypedMsgHdr &m) +{ + return o << "IPC message [type=" << m.type() << "]"; +} + #endif /* SQUID_IPC_TYPED_MSG_HDR_H */