Inquirer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemBuf.h"
19 #include <algorithm>
20 
22 
24 unsigned int Ipc::Inquirer::LastRequestId = 0;
25 
27 static bool
29 {
30  return c1.kidId < c2.kidId;
31 }
32 
34  double aTimeout):
35  AsyncJob("Ipc::Inquirer"),
36  codeContext(CodeContext::Current()),
37  request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
38 {
39  debugs(54, 5, HERE);
40 
41  // order by ascending kid IDs; useful for non-aggregatable stats
42  std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
43 }
44 
46 {
47  debugs(54, 5, HERE);
48  cleanup();
49 }
50 
51 void
53 {
54 }
55 
56 void
58 {
59  request->requestId = 0;
60 }
61 
62 void
64 {
65  if (pos == strands.end()) {
66  Must(done());
67  return;
68  }
69 
70  Must(request->requestId == 0);
71  AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
73  if (++LastRequestId == 0) // don't use zero value as request->requestId
74  ++LastRequestId;
75  request->requestId = LastRequestId;
76  const int kidId = pos->kidId;
77  debugs(54, 4, HERE << "inquire kid: " << kidId << status());
78  TheRequestsMap[request->requestId] = callback;
79  TypedMsgHdr message;
80  request->pack(message);
81  SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
82  eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
83  this, timeout, 0, false);
84 }
85 
87 void
89 {
90  debugs(54, 4, HERE << status());
91  request->requestId = 0;
92  removeTimeoutEvent();
93  if (aggregate(response)) {
94  Must(!done()); // or we should not be called
95  ++pos; // advance after a successful inquiry
96  inquire();
97  } else {
98  mustStop("error");
99  }
100 }
101 
102 void
104 {
105  debugs(54, 5, HERE);
106  removeTimeoutEvent();
107  if (request->requestId > 0) {
108  DequeueRequest(request->requestId);
109  request->requestId = 0;
110  }
111  sendResponse();
112  cleanup();
113 }
114 
115 bool
117 {
118  return pos == strands.end();
119 }
120 
121 void
122 Ipc::Inquirer::handleException(const std::exception& e)
123 {
124  debugs(54, 3, HERE << e.what());
125  mustStop("exception");
126 }
127 
128 void
129 Ipc::Inquirer::callException(const std::exception& e)
130 {
131  debugs(54, 3, HERE);
132  try {
133  handleException(e);
134  } catch (const std::exception& ex) {
135  debugs(54, DBG_CRITICAL, HERE << ex.what());
136  }
138 }
139 
142 Ipc::Inquirer::DequeueRequest(unsigned int requestId)
143 {
144  debugs(54, 3, HERE << " requestId " << requestId);
145  Must(requestId != 0);
146  AsyncCall::Pointer call;
147  RequestsMap::iterator request = TheRequestsMap.find(requestId);
148  if (request != TheRequestsMap.end()) {
149  call = request->second;
150  Must(call != NULL);
151  TheRequestsMap.erase(request);
152  }
153  return call;
154 }
155 
156 void
158 {
159  Must(response.requestId != 0);
160  AsyncCall::Pointer call = DequeueRequest(response.requestId);
161  if (call != NULL) {
162  HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
163  Must(dialer);
164  dialer->arg1 = response.clone();
165  ScheduleCallHere(call);
166  }
167 }
168 
170 void
172 {
175 }
176 
178 void
180 {
181  debugs(54, 3, HERE);
182  Must(param != NULL);
183  Inquirer* cmi = static_cast<Inquirer*>(param);
184  // use async call to enable job call protection that time events lack
185  CallBack(cmi->codeContext, [&cmi] {
186  CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
187  });
188 }
189 
191 void
193 {
194  debugs(54, 3, HERE);
195  if (request->requestId != 0) {
196  DequeueRequest(request->requestId);
197  request->requestId = 0;
198  Must(!done()); // or we should not be called
199  ++pos; // advance after a failed inquiry
200  inquire();
201  }
202 }
203 
204 const char*
206 {
207  static MemBuf buf;
208  buf.reset();
209  buf.appendf(" [request->requestId %u]", request->requestId);
210  buf.terminate();
211  return buf.content();
212 }
213 
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:51
int eventFind(EVH *func, void *arg)
Definition: event.cc:155
CodeContextPointer codeContext
Definition: Inquirer.h:42
void eventDelete(EVH *func, void *arg)
Definition: event.cc:131
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer)
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
int kidId
internal Squid process number
Definition: StrandCoord.h:29
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:22
virtual ~Inquirer()
Definition: Inquirer.cc:45
virtual void swanSong()
Definition: Inquirer.cc:103
virtual CallDialer * getDialer()=0
unsigned int requestId
ID of request we are responding to.
Definition: Response.h:38
#define DBG_CRITICAL
Definition: Debug.h:45
void CallBack(const CodeContext::Pointer &callbackContext, Fun &&callback)
Definition: CodeContext.h:81
virtual void start()
called by AsyncStart; do not call directly
Definition: Inquirer.cc:57
virtual const char * status() const
internal cleanup; do not call directly
Definition: Inquirer.cc:205
A response to Ipc::Request.
Definition: Response.h:21
Inquirer(Request::Pointer aRequest, const Ipc::StrandCoords &coords, double aTimeout)
Definition: Inquirer.cc:33
static AsyncCall::Pointer DequeueRequest(unsigned int requestId)
returns and forgets the right Inquirer callback for strand request
Definition: Inquirer.cc:142
#define NULL
Definition: types.h:166
void requestTimedOut()
called when the strand failed to respond (or finish responding) in time
Definition: Inquirer.cc:192
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:128
Definition: MemBuf.h:23
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:157
static void RequestTimedOut(void *param)
Ipc::Inquirer::requestTimedOut wrapper.
Definition: Inquirer.cc:179
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
virtual void cleanup()
perform cleanup actions on completion of job
Definition: Inquirer.cc:52
static unsigned int LastRequestId
last requestId used
Definition: Inquirer.h:85
std::map< unsigned int, AsyncCall::Pointer > RequestsMap
maps request->id to Inquirer::handleRemoteAck callback
Definition: Inquirer.h:82
virtual void handleException(const std::exception &e)
do specific exception handling
Definition: Inquirer.cc:122
Strand location details.
Definition: StrandCoord.h:19
virtual bool doneAll() const
whether positive goal has been reached
Definition: Inquirer.cc:116
static RequestsMap TheRequestsMap
pending strand requests
Definition: Inquirer.h:83
static bool LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
compare Ipc::StrandCoord using kidId, for std::sort() below
Definition: Inquirer.cc:28
virtual void inquire()
inquire the next strand
Definition: Inquirer.cc:63
void removeTimeoutEvent()
called when we are no longer waiting for the strand to respond
Definition: Inquirer.cc:171
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition: Inquirer.cc:157
void const char HLPCB * callback
Definition: stub_helper.cc:16
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:31
void handleRemoteAck(Response::Pointer response)
called when a strand is done writing its output
Definition: Inquirer.cc:88
Ipc::StrandCoords strands
all strands we want to query, in order
Definition: Inquirer.h:76
virtual void callException(const std::exception &e)
called when the job throws during an async call
Definition: AsyncJob.cc:127
virtual Pointer clone() const =0
returns a copy of this
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
virtual void callException(const std::exception &e)
called when the job throws during an async call
Definition: Inquirer.cc:129
void const char * buf
Definition: stub_helper.cc:16
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:109
Definition: IpcIoFile.h:23

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors