Coordinator.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 54 Interprocess Communication */
10
11#include "squid.h"
12#include "base/Subscription.h"
13#include "base/TextException.h"
14#include "CacheManager.h"
15#include "comm.h"
16#include "comm/Connection.h"
17#include "ipc/Coordinator.h"
18#include "ipc/SharedListen.h"
19#include "mgr/Inquirer.h"
20#include "mgr/Request.h"
21#include "mgr/Response.h"
22#include "tools.h"
23#if SQUID_SNMP
24#include "snmp/Inquirer.h"
25#include "snmp/Request.h"
26#include "snmp/Response.h"
27#endif
28
29#include <cerrno>
30
33
35 Port(Ipc::Port::CoordinatorAddr())
36{
37}
38
40{
42}
43
45{
46 typedef StrandCoords::iterator SI;
47 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
48 if (iter->kidId == kidId)
49 return &(*iter);
50 }
51 return nullptr;
52}
53
55{
56 debugs(54, 3, "registering kid" << strand.kidId <<
57 ' ' << strand.tag);
58 if (StrandCoord* found = findStrand(strand.kidId)) {
59 const String oldTag = found->tag;
60 *found = strand;
61 if (oldTag.size() && !strand.tag.size())
62 found->tag = oldTag; // keep more detailed info (XXX?)
63 } else {
64 strands_.push_back(strand);
65 }
66
67 // notify searchers waiting for this new strand, if any
68 typedef Searchers::iterator SRI;
69 for (SRI i = searchers.begin(); i != searchers.end();) {
70 if (i->tag == strand.tag) {
71 notifySearcher(*i, strand);
72 i = searchers.erase(i);
73 } else {
74 ++i;
75 }
76 }
77}
78
80{
81 switch (message.rawType()) {
83 debugs(54, 6, "Registration request");
84 handleRegistrationRequest(StrandMessage(message));
85 break;
86
87 case mtFindStrand: {
88 const StrandSearchRequest sr(message);
89 debugs(54, 6, "Strand search request: " << sr.requestorId <<
90 " tag: " << sr.tag);
91 handleSearchRequest(sr);
92 break;
93 }
94
96 debugs(54, 6, "Shared listen request");
97 handleSharedListenRequest(SharedListenRequest(message));
98 break;
99
100 case mtCacheMgrRequest: {
101 debugs(54, 6, "Cache manager request");
102 const Mgr::Request req(message);
103 handleCacheMgrRequest(req);
104 }
105 break;
106
107 case mtCacheMgrResponse: {
108 debugs(54, 6, "Cache manager response");
109 const Mgr::Response resp(message);
110 handleCacheMgrResponse(Mine(resp));
111 }
112 break;
113
114#if SQUID_SNMP
115 case mtSnmpRequest: {
116 debugs(54, 6, "SNMP request");
117 const Snmp::Request req(message);
118 handleSnmpRequest(req);
119 }
120 break;
121
122 case mtSnmpResponse: {
123 debugs(54, 6, "SNMP response");
124 const Snmp::Response resp(message);
125 handleSnmpResponse(Mine(resp));
126 }
127 break;
128#endif
129
130 default:
131 Port::receive(message);
132 break;
133 }
134}
135
137{
138 registerStrand(msg.strand);
139
140 // send back an acknowledgement; TODO: remove as not needed?
141 TypedMsgHdr message;
142 msg.pack(mtStrandRegistered, message);
143 SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
144}
145
146void
148{
149 debugs(54, 4, "kid" << request.requestorId <<
150 " needs shared listen FD for " << request.params.addr);
151 Listeners::const_iterator i = listeners.find(request.params);
152 int errNo = 0;
153 const Comm::ConnectionPointer c = (i != listeners.end()) ?
154 i->second : openListenSocket(request, errNo);
155
156 debugs(54, 3, "sending shared listen " << c << " for " <<
157 request.params.addr << " to kid" << request.requestorId <<
158 " mapId=" << request.mapId);
159
160 SharedListenResponse response(c->fd, errNo, request.mapId);
161 TypedMsgHdr message;
162 response.pack(message);
163 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
164}
165
166void
168{
169 debugs(54, 4, MYNAME);
170
171 try {
174 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
175 } catch (const std::exception &ex) {
176 debugs(54, DBG_IMPORTANT, "ERROR: Squid BUG: cannot aggregate mgr:" <<
177 request.params.actionName << ": " << ex.what());
178 // TODO: Avoid half-baked Connections or teach them how to close.
179 ::close(request.conn->fd);
180 request.conn->fd = -1;
181 return; // the worker will timeout and close
182 }
183
184 // Let the strand know that we are now responsible for handling the request
185 Mgr::Response response(request.requestId);
186 TypedMsgHdr message;
187 response.pack(message);
188 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
189
190}
191
192void
194{
196}
197
198void
200{
201 // do we know of a strand with the given search tag?
202 const StrandCoord *strand = nullptr;
203 typedef StrandCoords::const_iterator SCCI;
204 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
205 if (i->tag == request.tag)
206 strand = &(*i);
207 }
208
209 if (strand) {
210 notifySearcher(request, *strand);
211 return;
212 }
213
214 searchers.push_back(request);
215 debugs(54, 3, "cannot yet tell kid" << request.requestorId <<
216 " who " << request.tag << " is");
217}
218
219void
221 const StrandCoord& strand)
222{
223 debugs(54, 3, "tell kid" << request.requestorId << " that " <<
224 request.tag << " is kid" << strand.kidId);
225 const StrandMessage response(strand, request.qid);
226 TypedMsgHdr message;
227 response.pack(mtStrandReady, message);
228 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
229}
230
231#if SQUID_SNMP
232void
234{
235 debugs(54, 4, MYNAME);
236
237 Snmp::Response response(request.requestId);
238 TypedMsgHdr message;
239 response.pack(message);
240 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
241
242 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
243}
244
245void
247{
248 debugs(54, 4, MYNAME);
250}
251#endif
252
255 int &errNo)
256{
257 const OpenListenerParams &p = request.params;
258
259 debugs(54, 6, "opening listen FD at " << p.addr << " for kid" <<
260 request.requestorId);
261
263 newConn->local = p.addr; // comm_open_listener may modify it
264 newConn->flags = p.flags;
265
266 enter_suid();
268 errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
269 leave_suid();
270
271 debugs(54, 6, "tried listening on " << newConn << " for kid" <<
272 request.requestorId);
273
274 // cache positive results
275 if (Comm::IsConnOpen(newConn))
276 listeners[request.params] = newConn;
277
278 return newConn;
279}
280
282{
283 typedef StrandCoords::const_iterator SCI;
284 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
285 debugs(54, 5, "signal " << sig << " to kid" << iter->kidId <<
286 ", PID=" << iter->pid);
287 kill(iter->pid, sig);
288 }
289}
290
292{
293 if (!TheInstance)
294 TheInstance = new Coordinator;
295 // XXX: if the Coordinator job quits, this pointer will become invalid
296 // we could make Coordinator death fatal, except during exit, but since
297 // Strands do not re-register, even process death would be pointless.
298 return TheInstance;
299}
300
303{
304 return strands_;
305}
306
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator)
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
Mgr::Action::Pointer createRequestedAction(const Mgr::ActionParams &)
static CacheManager * GetInstance()
Ip::Address local
Definition: Connection.h:146
Coordinates shared activities of Strands (Squid processes or threads)
Definition: Coordinator.h:31
StrandCoord * findStrand(int kidId)
registered strand or NULL
Definition: Coordinator.cc:44
void handleSharedListenRequest(const SharedListenRequest &request)
returns cached socket or calls openListenSocket()
Definition: Coordinator.cc:147
Comm::ConnectionPointer openListenSocket(const SharedListenRequest &request, int &errNo)
calls comm_open_listener()
Definition: Coordinator.cc:254
void receive(const TypedMsgHdr &message) override
Definition: Coordinator.cc:79
void handleSnmpRequest(const Snmp::Request &request)
Definition: Coordinator.cc:233
static Coordinator * TheInstance
the only class instance in existence
Definition: Coordinator.h:77
const StrandCoords & strands() const
currently registered strands
Definition: Coordinator.cc:302
void handleCacheMgrRequest(const Mgr::Request &request)
Definition: Coordinator.cc:167
void notifySearcher(const StrandSearchRequest &request, const StrandCoord &)
answer the waiting search request
Definition: Coordinator.cc:220
void handleSnmpResponse(const Snmp::Response &response)
Definition: Coordinator.cc:246
void start() override
called by AsyncStart; do not call directly
Definition: Coordinator.cc:39
static Coordinator * Instance()
Definition: Coordinator.cc:291
void registerStrand(const StrandCoord &)
adds or updates existing
Definition: Coordinator.cc:54
void handleSearchRequest(const StrandSearchRequest &request)
answers or queues the request if the answer is not yet known
Definition: Coordinator.cc:199
void handleCacheMgrResponse(const Mgr::Response &response)
Definition: Coordinator.cc:193
void handleRegistrationRequest(const StrandMessage &)
register,ACK
Definition: Coordinator.cc:136
void broadcastSignal(int sig) const
send sig to registered strands
Definition: Coordinator.cc:281
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition: Inquirer.cc:171
"shared listen" is when concurrent processes are listening on the same fd
Definition: SharedListen.h:29
int fdNote
index into fd_note() comment strings
Definition: SharedListen.h:36
Ip::Address addr
will be memset and memcopied
Definition: SharedListen.h:39
Waits for and receives incoming IPC messages; kids handle the messages.
Definition: Port.h:22
void start() override=0
called by AsyncStart; do not call directly
Definition: Port.cc:31
virtual void receive(const TypedMsgHdr &)=0
Definition: Port.cc:78
int requestorId
kidId of the requestor; used for response destination
Definition: Request.h:37
RequestId requestId
matches the request[or] with the response
Definition: Request.h:38
a request for a listen socket with given parameters
Definition: SharedListen.h:47
OpenListenerParams params
actual comm_open_sharedListen() parameters
Definition: SharedListen.h:56
int requestorId
kidId of the requestor
Definition: SharedListen.h:54
RequestId mapId
to map future response to the requestor's callback
Definition: SharedListen.h:58
a response to SharedListenRequest
Definition: SharedListen.h:63
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Strand location details.
Definition: StrandCoord.h:22
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:34
int kidId
internal Squid process number
Definition: StrandCoord.h:31
an IPC message carrying StrandCoord
Definition: StrandCoord.h:39
void pack(MessageType, TypedMsgHdr &) const
Definition: StrandCoord.cc:54
StrandCoord strand
messageType-specific coordinates (e.g., sender)
Definition: StrandCoord.h:52
asynchronous strand search request
Definition: StrandSearch.h:22
int requestorId
sender-provided return address
Definition: StrandSearch.h:29
String tag
set when looking for a matching StrandCoord::tag
Definition: StrandSearch.h:30
QuestionerId qid
the sender of the request
Definition: StrandSearch.h:31
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:35
int rawType() const
Definition: TypedMsgHdr.h:51
String actionName
action name (and credentials realm)
Definition: ActionParams.h:39
cache manager request
Definition: Request.h:24
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition: Request.h:35
ActionParams params
action name and parameters
Definition: Request.h:37
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition: Response.cc:43
SNMP request.
Definition: Request.h:25
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition: Response.cc:31
size_type size() const
Definition: SquidString.h:73
void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note)
Definition: comm.cc:256
#define MYNAME
Definition: Stream.h:236
#define DBG_IMPORTANT
Definition: Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
Definition: IpcIoFile.h:24
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:23
const Answer & Mine(const Answer &answer)
Definition: QuestionerId.h:56
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
Definition: FdNotes.cc:16
@ mtCacheMgrRequest
Definition: Messages.h:35
@ mtRegisterStrand
notifies about our strand existence
Definition: Messages.h:22
@ mtStrandReady
an mtFindStrand answer: the strand exists and should be usable
Definition: Messages.h:26
@ mtSnmpResponse
Definition: Messages.h:40
@ mtSnmpRequest
Definition: Messages.h:39
@ mtFindStrand
a worker requests a strand from Coordinator
Definition: Messages.h:25
@ mtSharedListenRequest
Definition: Messages.h:28
@ mtStrandRegistered
acknowledges mtRegisterStrand acceptance
Definition: Messages.h:23
@ mtCacheMgrResponse
Definition: Messages.h:36
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
SSL Connection
Definition: Session.h:45
static bool action(int fd, size_t metasize, const char *fn, const char *url, const SquidMetaList &meta)
Definition: purge.cc:315
void leave_suid(void)
Definition: tools.cc:559
void enter_suid(void)
Definition: tools.cc:623

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors