Inquirer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 16 Cache Manager API */
10 
11 #include "squid.h"
12 #include "AccessLogEntry.h"
13 #include "base/TextException.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Write.h"
17 #include "CommCalls.h"
18 #include "errorpage.h"
19 #include "HttpReply.h"
20 #include "HttpRequest.h"
21 #include "ipc/UdsOp.h"
22 #include "mgr/ActionWriter.h"
23 #include "mgr/Command.h"
24 #include "mgr/Inquirer.h"
25 #include "mgr/IntParam.h"
26 #include "mgr/Request.h"
27 #include "mgr/Response.h"
28 #include "SquidTime.h"
29 #include <memory>
30 #include <algorithm>
31 
33 
35  const Request &aCause, const Ipc::StrandCoords &coords):
36  Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
37  aggrAction(anAction)
38 {
39  conn = aCause.conn;
40  Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
41 
42  debugs(16, 5, HERE << conn << " action: " << aggrAction);
43 
44  closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
47 }
48 
50 void
52 {
53  if (Comm::IsConnOpen(conn)) {
54  removeCloseHandler();
55  conn->close();
56  }
57 }
58 
59 void
61 {
62  if (closer != NULL) {
63  comm_remove_close_handler(conn->fd, closer);
64  closer = NULL;
65  }
66 }
67 
68 void
70 {
71  debugs(16, 5, HERE);
74  Must(aggrAction != NULL);
75 
76  std::unique_ptr<MemBuf> replyBuf;
77  if (strands.empty()) {
78  const char *url = aggrAction->command().params.httpUri.termedBuf();
80  auto *req = HttpRequest::FromUrlXXX(url, mx);
81  ErrorState err(ERR_INVALID_URL, Http::scNotFound, req, nullptr);
82  std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
83  replyBuf.reset(reply->pack());
84  } else {
85  std::unique_ptr<HttpReply> reply(new HttpReply);
86  reply->setHeaders(Http::scOkay, NULL, "text/plain", -1, squid_curtime, squid_curtime);
87  reply->header.putStr(Http::HdrType::CONNECTION, "close"); // until we chunk response
88  replyBuf.reset(reply->pack());
89  }
90  writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
92  Comm::Write(conn, replyBuf.get(), writer);
93 }
94 
96 void
98 {
99  debugs(16, 5, HERE);
100  writer = NULL;
101  Must(params.flag == Comm::OK);
102  Must(params.conn.getRaw() == conn.getRaw());
103  Must(params.size != 0);
104  // start inquiries at the initial pos
105  inquire();
106 }
107 
109 void
111 {
112  debugs(16, 5, HERE);
113  Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
114  conn = NULL;
115  mustStop("commClosed");
116 }
117 
118 bool
120 {
121  Mgr::Response& response = static_cast<Response&>(*aResponse);
122  if (response.hasAction())
123  aggrAction->add(response.getAction());
124  return true;
125 }
126 
127 void
129 {
130  if (!strands.empty() && aggrAction->aggregatable()) {
131  removeCloseHandler();
132  AsyncJob::Start(new ActionWriter(aggrAction, conn));
133  conn = NULL; // should not close because we passed it to ActionWriter
134  }
135 }
136 
137 bool
139 {
140  return !writer && Ipc::Inquirer::doneAll();
141 }
142 
145 {
147 
148  QueryParam::Pointer processesParam = aParams.get("processes");
149  QueryParam::Pointer workersParam = aParams.get("workers");
150 
151  if (processesParam == NULL || workersParam == NULL) {
152  if (processesParam != NULL) {
153  IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
154  if (param != NULL && param->type == QueryParam::ptInt) {
155  const std::vector<int>& processes = param->value();
156  for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
157  iter != aStrands.end(); ++iter) {
158  if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
159  sc.push_back(*iter);
160  }
161  }
162  } else if (workersParam != NULL) {
163  IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
164  if (param != NULL && param->type == QueryParam::ptInt) {
165  const std::vector<int>& workers = param->value();
166  for (int i = 0; i < (int)aStrands.size(); ++i) {
167  if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
168  sc.push_back(aStrands[i]);
169  }
170  }
171  } else {
172  sc = aStrands;
173  }
174  }
175 
176  debugs(16, 4, HERE << "strands kid IDs = ");
177  for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
178  debugs(16, 4, HERE << iter->kidId);
179  }
180 
181  return sc;
182 }
183 
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:983
void noteWroteHeader(const CommIoCbParams &params)
called when we wrote the response header
Definition: Inquirer.cc:97
HttpHeader header
Definition: Message.h:75
Inquirer(Action::Pointer anAction, const Request &aCause, const Ipc::StrandCoords &coords)
Definition: Inquirer.cc:34
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer)
HttpReply * BuildHttpReply(void)
Definition: errorpage.cc:1281
int const char int
Definition: stub_libmem.cc:75
QueryParam::Pointer get(const String &name) const
returns query parameter by name
Definition: QueryParams.cc:19
const std::vector< int > & value() const
Definition: IntParam.cc:48
C * getRaw() const
Definition: RefCount.h:80
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:26
@ OK
Definition: Flag.h:16
virtual void start()
called by AsyncStart; do not call directly
Definition: Inquirer.cc:57
@ initIpc
the IPC subsystem
void noteCommClosed(const CommCloseCbParams &params)
called when the HTTP client or some external force closed our socket
Definition: Inquirer.cc:110
Action::Pointer aggrAction
Definition: Inquirer.h:51
#define NULL
Definition: types.h:166
Ipc::StrandCoords applyQueryParams(const Ipc::StrandCoords &aStrands, const QueryParams &aParams)
Definition: Inquirer.cc:144
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:128
MemBuf * pack() const
Definition: HttpReply.cc:111
static Pointer Start(AsyncJob *job)
starts a freshly created job (i.e., makes the job asynchronous)
Definition: AsyncJob.cc:23
virtual bool aggregate(Ipc::Response::Pointer aResponse)
perform aggregating of responses and returns true if need to continue
Definition: Inquirer.cc:119
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:157
const Action & getAction() const
returns action object
Definition: Response.cc:72
cache manager request
Definition: Request.h:22
Comm::ConnectionPointer conn
Definition: CommCalls.h:85
int conn
the current server connection FD
Definition: Transport.cc:26
bool hasAction() const
whether response contain action object
Definition: Response.cc:66
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:87
virtual bool doneAll() const
whether positive goal has been reached
Definition: Inquirer.cc:138
@ fdnHttpSocket
Definition: FdNotes.h:20
static int sc[16]
Definition: smbdes.c:121
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:35
time_t squid_curtime
Definition: stub_time.cc:17
virtual bool doneAll() const
whether positive goal has been reached
Definition: Inquirer.cc:116
@ scNotFound
Definition: StatusCode.h:48
void removeCloseHandler()
Definition: Inquirer.cc:60
void putStr(Http::HdrType id, const char *str)
Definition: HttpHeader.cc:1068
static HttpRequest * FromUrlXXX(const char *url, const MasterXaction::Pointer &, const HttpRequestMethod &method=Http::METHOD_GET)
Definition: HttpRequest.cc:546
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
Comm::ConnectionPointer conn
HTTP client socket descriptor.
Definition: Inquirer.h:53
virtual void start()
called by AsyncStart; do not call directly
Definition: Inquirer.cc:69
@ ERR_INVALID_URL
Definition: err_type.h:43
AsyncCall::Pointer closer
comm_close handler
Definition: Inquirer.h:56
const Comm::ConnectionPointer & ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, FdNoteId noteId)
import socket fd from another strand into our Comm state
Definition: UdsOp.cc:194
void setHeaders(Http::StatusCode status, const char *reason, const char *ctype, int64_t clen, time_t lmt, time_t expires)
Definition: HttpReply.cc:168
@ scOkay
Definition: StatusCode.h:26
virtual void cleanup()
closes our copy of the client HTTP connection socket
Definition: Inquirer.cc:51
virtual void sendResponse()
send response to client
Definition: Inquirer.cc:128
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:1010
Definition: IpcIoFile.h:23
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition: Request.h:37
Cache Manager API.
Definition: Action.h:19

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors