Inquirer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemBuf.h"
19 #include <algorithm>
20 
21 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
22 
24 unsigned int Ipc::Inquirer::LastRequestId = 0;
25 
27 static bool
29 {
30  return c1.kidId < c2.kidId;
31 }
32 
34  double aTimeout):
35  AsyncJob("Ipc::Inquirer"),
36  request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
37 {
38  debugs(54, 5, HERE);
39 
40  // order by ascending kid IDs; useful for non-aggregatable stats
41  std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
42 }
43 
45 {
46  debugs(54, 5, HERE);
47  cleanup();
48 }
49 
50 void
52 {
53 }
54 
55 void
57 {
58  request->requestId = 0;
59 }
60 
61 void
63 {
64  if (pos == strands.end()) {
65  Must(done());
66  return;
67  }
68 
69  Must(request->requestId == 0);
70  AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
72  if (++LastRequestId == 0) // don't use zero value as request->requestId
73  ++LastRequestId;
74  request->requestId = LastRequestId;
75  const int kidId = pos->kidId;
76  debugs(54, 4, HERE << "inquire kid: " << kidId << status());
77  TheRequestsMap[request->requestId] = callback;
78  TypedMsgHdr message;
79  request->pack(message);
80  SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
81  eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
82  this, timeout, 0, false);
83 }
84 
86 void
88 {
89  debugs(54, 4, HERE << status());
90  request->requestId = 0;
91  removeTimeoutEvent();
92  if (aggregate(response)) {
93  Must(!done()); // or we should not be called
94  ++pos; // advance after a successful inquiry
95  inquire();
96  } else {
97  mustStop("error");
98  }
99 }
100 
101 void
103 {
104  debugs(54, 5, HERE);
105  removeTimeoutEvent();
106  if (request->requestId > 0) {
107  DequeueRequest(request->requestId);
108  request->requestId = 0;
109  }
110  sendResponse();
111  cleanup();
112 }
113 
114 bool
116 {
117  return pos == strands.end();
118 }
119 
120 void
121 Ipc::Inquirer::handleException(const std::exception& e)
122 {
123  debugs(54, 3, HERE << e.what());
124  mustStop("exception");
125 }
126 
127 void
128 Ipc::Inquirer::callException(const std::exception& e)
129 {
130  debugs(54, 3, HERE);
131  try {
132  handleException(e);
133  } catch (const std::exception& ex) {
134  debugs(54, DBG_CRITICAL, HERE << ex.what());
135  }
137 }
138 
141 Ipc::Inquirer::DequeueRequest(unsigned int requestId)
142 {
143  debugs(54, 3, HERE << " requestId " << requestId);
144  Must(requestId != 0);
145  AsyncCall::Pointer call;
146  RequestsMap::iterator request = TheRequestsMap.find(requestId);
147  if (request != TheRequestsMap.end()) {
148  call = request->second;
149  Must(call != NULL);
150  TheRequestsMap.erase(request);
151  }
152  return call;
153 }
154 
155 void
157 {
158  Must(response.requestId != 0);
159  AsyncCall::Pointer call = DequeueRequest(response.requestId);
160  if (call != NULL) {
161  HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
162  Must(dialer);
163  dialer->arg1 = response.clone();
164  ScheduleCallHere(call);
165  }
166 }
167 
169 void
171 {
174 }
175 
177 void
179 {
180  debugs(54, 3, HERE);
181  Must(param != NULL);
182  Inquirer* cmi = static_cast<Inquirer*>(param);
183  // use async call to enable job call protection that time events lack
184  CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
185 }
186 
188 void
190 {
191  debugs(54, 3, HERE);
192  if (request->requestId != 0) {
193  DequeueRequest(request->requestId);
194  request->requestId = 0;
195  Must(!done()); // or we should not be called
196  ++pos; // advance after a failed inquiry
197  inquire();
198  }
199 }
200 
201 const char*
203 {
204  static MemBuf buf;
205  buf.reset();
206  buf.appendf(" [request->requestId %u]", request->requestId);
207  buf.terminate();
208  return buf.content();
209 }
210 
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer)
static void RequestTimedOut(void *param)
Ipc::Inquirer::requestTimedOut wrapper.
Definition: Inquirer.cc:178
void const char HLPCB * callback
Definition: stub_helper.cc:16
#define CallJobHere(debugSection, debugLevel, job, Class, method)
Definition: AsyncJobCalls.h:57
Strand location details.
Definition: StrandCoord.h:19
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
void removeTimeoutEvent()
called when we are no longer waiting for the strand to respond
Definition: Inquirer.cc:170
unsigned int requestId
ID of request we are responding to.
Definition: Response.h:38
virtual void callException(const std::exception &e)
called when the job throws during an async call
Definition: AsyncJob.cc:127
int kidId
internal Squid process number
Definition: StrandCoord.h:29
virtual Pointer clone() const =0
returns a copy of this
#define DBG_CRITICAL
Definition: Debug.h:44
std::map< unsigned int, AsyncCall::Pointer > RequestsMap
maps request->id to Inquirer::handleRemoteAck callback
Definition: Inquirer.h:79
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition: Inquirer.cc:156
Ipc::StrandCoords strands
all strands we want to query, in order
Definition: Inquirer.h:73
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
virtual CallDialer * getDialer()=0
virtual void handleException(const std::exception &e)
do specific exception handling
Definition: Inquirer.cc:121
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
int eventFind(EVH *func, void *arg)
Definition: event.cc:155
virtual void callException(const std::exception &e)
called when the job throws during an async call
Definition: Inquirer.cc:128
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:51
virtual void cleanup()
perform cleanup actions on completion of job
Definition: Inquirer.cc:51
void handleRemoteAck(Response::Pointer response)
called when a strand is done writing its output
Definition: Inquirer.cc:87
void reset()
Definition: MemBuf.cc:141
void requestTimedOut()
called when the strand failed to respond (or finish responding) in time
Definition: Inquirer.cc:189
virtual const char * status() const
internal cleanup; do not call directly
Definition: Inquirer.cc:202
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
virtual void inquire()
inquire the next strand
Definition: Inquirer.cc:62
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
char * content()
start of the added data
Definition: MemBuf.h:41
void const char * buf
Definition: stub_helper.cc:16
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:147
void eventDelete(EVH *func, void *arg)
Definition: event.cc:131
#define Must(cond)
Definition: TextException.h:89
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:22
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:109
Inquirer(Request::Pointer aRequest, const Ipc::StrandCoords &coords, double aTimeout)
Definition: Inquirer.cc:33
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
virtual ~Inquirer()
Definition: Inquirer.cc:44
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:31
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
Definition: MemBuf.h:23
static unsigned int LastRequestId
last requestId used
Definition: Inquirer.h:82
static AsyncCall::Pointer DequeueRequest(unsigned int requestId)
returns and forgets the right Inquirer callback for strand request
Definition: Inquirer.cc:141
virtual void start()
called by AsyncStart; do not call directly
Definition: Inquirer.cc:56
static bool LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
compare Ipc::StrandCoord using kidId, for std::sort() below
Definition: Inquirer.cc:28
A response to Ipc::Request.
Definition: Response.h:21
virtual void swanSong()
Definition: Inquirer.cc:102
virtual bool doneAll() const
whether positive goal has been reached
Definition: Inquirer.cc:115
#define NULL
Definition: types.h:166
static RequestsMap TheRequestsMap
pending strand requests
Definition: Inquirer.h:80
void terminate()
Definition: MemBuf.cc:259

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors