Coordinator.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/Subscription.h"
13 #include "base/TextException.h"
14 #include "CacheManager.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "ipc/Coordinator.h"
18 #include "ipc/SharedListen.h"
19 #include "mgr/Inquirer.h"
20 #include "mgr/Request.h"
21 #include "mgr/Response.h"
22 #include "tools.h"
23 #if SQUID_SNMP
24 #include "snmp/Inquirer.h"
25 #include "snmp/Request.h"
26 #include "snmp/Response.h"
27 #endif
28 
29 #include <cerrno>
30 
31 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
32 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
33 
35  Port(Ipc::Port::CoordinatorAddr())
36 {
37 }
38 
40 {
41  Port::start();
42 }
43 
45 {
46  typedef StrandCoords::iterator SI;
47  for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
48  if (iter->kidId == kidId)
49  return &(*iter);
50  }
51  return NULL;
52 }
53 
55 {
56  debugs(54, 3, HERE << "registering kid" << strand.kidId <<
57  ' ' << strand.tag);
58  if (StrandCoord* found = findStrand(strand.kidId)) {
59  const String oldTag = found->tag;
60  *found = strand;
61  if (oldTag.size() && !strand.tag.size())
62  found->tag = oldTag; // keep more detailed info (XXX?)
63  } else {
64  strands_.push_back(strand);
65  }
66 
67  // notify searchers waiting for this new strand, if any
68  typedef Searchers::iterator SRI;
69  for (SRI i = searchers.begin(); i != searchers.end();) {
70  if (i->tag == strand.tag) {
71  notifySearcher(*i, strand);
72  i = searchers.erase(i);
73  } else {
74  ++i;
75  }
76  }
77 }
78 
80 {
81  switch (message.type()) {
82  case mtRegistration:
83  debugs(54, 6, HERE << "Registration request");
84  handleRegistrationRequest(HereIamMessage(message));
85  break;
86 
87  case mtStrandSearchRequest: {
88  const StrandSearchRequest sr(message);
89  debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
90  " tag: " << sr.tag);
91  handleSearchRequest(sr);
92  break;
93  }
94 
96  debugs(54, 6, HERE << "Shared listen request");
97  handleSharedListenRequest(SharedListenRequest(message));
98  break;
99 
100  case mtCacheMgrRequest: {
101  debugs(54, 6, HERE << "Cache manager request");
102  const Mgr::Request req(message);
103  handleCacheMgrRequest(req);
104  }
105  break;
106 
107  case mtCacheMgrResponse: {
108  debugs(54, 6, HERE << "Cache manager response");
109  const Mgr::Response resp(message);
110  handleCacheMgrResponse(resp);
111  }
112  break;
113 
114 #if SQUID_SNMP
115  case mtSnmpRequest: {
116  debugs(54, 6, HERE << "SNMP request");
117  const Snmp::Request req(message);
118  handleSnmpRequest(req);
119  }
120  break;
121 
122  case mtSnmpResponse: {
123  debugs(54, 6, HERE << "SNMP response");
124  const Snmp::Response resp(message);
125  handleSnmpResponse(resp);
126  }
127  break;
128 #endif
129 
130  default:
131  debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
132  break;
133  }
134 }
135 
137 {
138  registerStrand(msg.strand);
139 
140  // send back an acknowledgement; TODO: remove as not needed?
141  TypedMsgHdr message;
142  msg.pack(message);
143  SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
144 }
145 
146 void
148 {
149  debugs(54, 4, HERE << "kid" << request.requestorId <<
150  " needs shared listen FD for " << request.params.addr);
151  Listeners::const_iterator i = listeners.find(request.params);
152  int errNo = 0;
153  const Comm::ConnectionPointer c = (i != listeners.end()) ?
154  i->second : openListenSocket(request, errNo);
155 
156  debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
157  request.params.addr << " to kid" << request.requestorId <<
158  " mapId=" << request.mapId);
159 
160  SharedListenResponse response(c->fd, errNo, request.mapId);
161  TypedMsgHdr message;
162  response.pack(message);
163  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
164 }
165 
166 void
168 {
169  debugs(54, 4, HERE);
170 
171  try {
174  AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
175  } catch (const std::exception &ex) {
176  debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
177  request.params.actionName << ": " << ex.what());
178  // TODO: Avoid half-baked Connections or teach them how to close.
179  ::close(request.conn->fd);
180  request.conn->fd = -1;
181  return; // the worker will timeout and close
182  }
183 
184  // Let the strand know that we are now responsible for handling the request
185  Mgr::Response response(request.requestId);
186  TypedMsgHdr message;
187  response.pack(message);
188  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
189 
190 }
191 
192 void
194 {
196 }
197 
198 void
200 {
201  // do we know of a strand with the given search tag?
202  const StrandCoord *strand = NULL;
203  typedef StrandCoords::const_iterator SCCI;
204  for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
205  if (i->tag == request.tag)
206  strand = &(*i);
207  }
208 
209  if (strand) {
210  notifySearcher(request, *strand);
211  return;
212  }
213 
214  searchers.push_back(request);
215  debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
216  " who " << request.tag << " is");
217 }
218 
219 void
221  const StrandCoord& strand)
222 {
223  debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
224  request.tag << " is kid" << strand.kidId);
225  const StrandSearchResponse response(strand);
226  TypedMsgHdr message;
227  response.pack(message);
228  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
229 }
230 
231 #if SQUID_SNMP
232 void
234 {
235  debugs(54, 4, HERE);
236 
237  Snmp::Response response(request.requestId);
238  TypedMsgHdr message;
239  response.pack(message);
240  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
241 
242  AsyncJob::Start(new Snmp::Inquirer(request, strands_));
243 }
244 
245 void
247 {
248  debugs(54, 4, HERE);
250 }
251 #endif
252 
255  int &errNo)
256 {
257  const OpenListenerParams &p = request.params;
258 
259  debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
260  request.requestorId);
261 
263  newConn->local = p.addr; // comm_open_listener may modify it
264  newConn->flags = p.flags;
265 
266  enter_suid();
267  comm_open_listener(p.sock_type, p.proto, newConn, FdNote(p.fdNote));
268  errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
269  leave_suid();
270 
271  debugs(54, 6, HERE << "tried listening on " << newConn << " for kid" <<
272  request.requestorId);
273 
274  // cache positive results
275  if (Comm::IsConnOpen(newConn))
276  listeners[request.params] = newConn;
277 
278  return newConn;
279 }
280 
282 {
283  typedef StrandCoords::const_iterator SCI;
284  for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
285  debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
286  ", PID=" << iter->pid);
287  kill(iter->pid, sig);
288  }
289 }
290 
292 {
293  if (!TheInstance)
294  TheInstance = new Coordinator;
295  // XXX: if the Coordinator job quits, this pointer will become invalid
296  // we could make Coordinator death fatal, except during exit, but since
297  // Strands do not re-register, even process death would be pointless.
298  return TheInstance;
299 }
300 
301 const Ipc::StrandCoords&
303 {
304  return strands_;
305 }
306 
virtual void pack(Ipc::TypedMsgHdr &msg) const
prepare for sendmsg()
Definition: Response.cc:42
virtual void start()=0
called by AsyncStart; do not call directly
Definition: Port.cc:30
strand registration with Coordinator (also used as an ACK)
Definition: StrandCoord.h:36
cache manager request
Definition: Request.h:22
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
Definition: FdNotes.cc:16
StrandCoord strand
registrant coordinates and related details
Definition: StrandCoord.h:44
int requestorId
sender-provided return address
Definition: StrandSearch.h:28
Coordinates shared activities of Strands (Squid processes or threads)
Definition: Coordinator.h:30
String tag
set when looking for a matching StrandCoord::tag
Definition: StrandSearch.h:29
String actionName
action name (and credentials realm)
Definition: ActionParams.h:39
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandSearch.cc:48
void handleSnmpRequest(const Snmp::Request &request)
Definition: Coordinator.cc:233
Strand location details.
Definition: StrandCoord.h:19
void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note)
Definition: comm.cc:232
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
int i
Definition: membanger.c:49
int requestorId
kidId of the requestor
Definition: SharedListen.h:55
int kidId
internal Squid process number
Definition: StrandCoord.h:29
char * p
Definition: membanger.c:43
void handleRegistrationRequest(const HereIamMessage &)
register,ACK
Definition: Coordinator.cc:136
Ip::Address addr
will be memset and memcopied
Definition: SharedListen.h:37
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition: Inquirer.cc:156
size_type size() const
Definition: SquidString.h:71
asynchronous strand search request
Definition: StrandSearch.h:20
OpenListenerParams params
actual comm_open_sharedListen() parameters
Definition: SharedListen.h:57
static CacheManager * GetInstance()
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandCoord.cc:51
static Pointer Start(AsyncJob *job)
starts a freshly created job (i.e., makes the job asynchronous)
Definition: AsyncJob.cc:23
a response to SharedListenRequest
Definition: SharedListen.h:63
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:24
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator)
void notifySearcher(const StrandSearchRequest &request, const StrandCoord &)
answer the waiting search request
Definition: Coordinator.cc:220
ActionParams params
action name and parameters
Definition: Request.h:39
void handleCacheMgrRequest(const Mgr::Request &request)
Definition: Coordinator.cc:167
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
#define DBG_IMPORTANT
Definition: Debug.h:45
unsigned int requestId
unique for sender; matches request w/ response
Definition: Request.h:39
void leave_suid(void)
Definition: tools.cc:504
Mgr::Action::Pointer createRequestedAction(const Mgr::ActionParams &)
a request for a listen socket with given parameters
Definition: SharedListen.h:47
int requestorId
kidId of the requestor; used for response destination
Definition: Request.h:38
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
SNMP request.
Definition: Request.h:24
void handleCacheMgrResponse(const Mgr::Response &response)
Definition: Coordinator.cc:193
void handleSnmpResponse(const Snmp::Response &response)
Definition: Coordinator.cc:246
virtual void receive(const TypedMsgHdr &message)
handle IPC message just read
Definition: Coordinator.cc:79
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
asynchronous strand search response
Definition: StrandSearch.h:33
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:147
bool action(int fd, size_t metasize, const char *fn, const char *url, const SquidMetaList &meta)
Definition: purge.cc:311
StrandCoord * findStrand(int kidId)
registered strand or NULL
Definition: Coordinator.cc:44
Ip::Address local
Definition: Connection.h:135
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:22
virtual void start()
called by AsyncStart; do not call directly
Definition: Coordinator.cc:39
int fdNote
index into fd_note() comment strings
Definition: SharedListen.h:34
Waits for and receives incoming IPC messages; kids handle the messages.
Definition: Port.h:21
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition: Request.h:37
void enter_suid(void)
Definition: tools.cc:575
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:31
const StrandCoords & strands() const
currently registered strands
Definition: Coordinator.cc:302
int type() const
returns stored type or zero if none
Definition: TypedMsgHdr.cc:68
Comm::ConnectionPointer openListenSocket(const SharedListenRequest &request, int &errNo)
calls comm_open_listener()
Definition: Coordinator.cc:254
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:32
void registerStrand(const StrandCoord &)
adds or updates existing
Definition: Coordinator.cc:54
static Coordinator * Instance()
Definition: Coordinator.cc:291
"shared listen" is when concurrent processes are listening on the same fd
Definition: SharedListen.h:24
void handleSharedListenRequest(const SharedListenRequest &request)
returns cached socket or calls openListenSocket()
Definition: Coordinator.cc:147
void broadcastSignal(int sig) const
send sig to registered strands
Definition: Coordinator.cc:281
#define NULL
Definition: types.h:166
void handleSearchRequest(const StrandSearchRequest &request)
answers or queues the request if the answer is not yet known
Definition: Coordinator.cc:199
int mapId
to map future response to the requestor's callback
Definition: SharedListen.h:59
virtual void pack(Ipc::TypedMsgHdr &msg) const
prepare for sendmsg()
Definition: Response.cc:48

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors