Inquirer.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 16 Cache Manager API */
10
11#include "squid.h"
12#include "AccessLogEntry.h"
13#include "base/TextException.h"
14#include "CacheManager.h"
15#include "comm.h"
16#include "comm/Connection.h"
17#include "comm/Write.h"
18#include "CommCalls.h"
19#include "errorpage.h"
20#include "HttpReply.h"
21#include "HttpRequest.h"
22#include "ipc/UdsOp.h"
23#include "mgr/ActionWriter.h"
24#include "mgr/Command.h"
25#include "mgr/Inquirer.h"
26#include "mgr/IntParam.h"
27#include "mgr/Request.h"
28#include "mgr/Response.h"
29
30#include <memory>
31#include <algorithm>
32
34
36 const Request &aCause, const Ipc::StrandCoords &coords):
37 Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
38 aggrAction(anAction)
39{
40 conn = aCause.conn;
41 Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
42
43 debugs(16, 5, conn << " action: " << aggrAction);
44
45 closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
48}
49
51void
53{
55 removeCloseHandler();
56 conn->close();
57 }
58}
59
60void
62{
63 if (closer != nullptr) {
65 closer = nullptr;
66 }
67}
68
69void
71{
72 debugs(16, 5, MYNAME);
75 Must(aggrAction != nullptr);
76
77 const auto &origin = aggrAction->command().params.httpOrigin;
78 const auto originOrNil = origin.size() ? origin.termedBuf() : nullptr;
79
80 std::unique_ptr<MemBuf> replyBuf;
81 if (strands.empty()) {
82 const char *url = aggrAction->command().params.httpUri.termedBuf();
83 const auto mx = MasterXaction::MakePortless<XactionInitiator::initIpc>();
84 auto *req = HttpRequest::FromUrlXXX(url, mx);
86 std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
87 CacheManager::PutCommonResponseHeaders(*reply, originOrNil);
88 replyBuf.reset(reply->pack());
89 } else {
90 std::unique_ptr<HttpReply> reply(new HttpReply);
91 reply->setHeaders(Http::scOkay, nullptr, "text/plain", -1, squid_curtime, squid_curtime);
92 CacheManager::PutCommonResponseHeaders(*reply, originOrNil);
93 reply->header.putStr(Http::HdrType::CONNECTION, "close"); // until we chunk response
94 replyBuf.reset(reply->pack());
95 }
96 writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
98 Comm::Write(conn, replyBuf.get(), writer);
99}
100
102void
104{
105 debugs(16, 5, MYNAME);
106 writer = nullptr;
107 Must(params.flag == Comm::OK);
108 Must(params.conn.getRaw() == conn.getRaw());
109 Must(params.size != 0);
110 // start inquiries at the initial pos
111 inquire();
112}
113
115void
117{
118 debugs(16, 5, MYNAME);
119 closer = nullptr;
120 if (conn) {
121 conn->noteClosure();
122 conn = nullptr;
123 }
124 mustStop("commClosed");
125}
126
127bool
129{
130 Mgr::Response& response = static_cast<Response&>(*aResponse);
131 if (response.hasAction())
132 aggrAction->add(response.getAction());
133 return true;
134}
135
136void
138{
139 if (!strands.empty() && aggrAction->aggregatable()) {
140 removeCloseHandler();
141 AsyncJob::Start(new ActionWriter(aggrAction, conn));
142 conn = nullptr; // should not close because we passed it to ActionWriter
143 }
144}
145
146bool
148{
149 return !writer && Ipc::Inquirer::doneAll();
150}
151
154{
156
157 QueryParam::Pointer processesParam = aParams.get("processes");
158 QueryParam::Pointer workersParam = aParams.get("workers");
159
160 if (processesParam == nullptr || workersParam == nullptr) {
161 if (processesParam != nullptr) {
162 IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
163 if (param != nullptr && param->type == QueryParam::ptInt) {
164 const std::vector<int>& processes = param->value();
165 for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
166 iter != aStrands.end(); ++iter) {
167 if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
168 sc.push_back(*iter);
169 }
170 }
171 } else if (workersParam != nullptr) {
172 IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
173 if (param != nullptr && param->type == QueryParam::ptInt) {
174 const std::vector<int>& workers = param->value();
175 for (int i = 0; i < (int)aStrands.size(); ++i) {
176 if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
177 sc.push_back(aStrands[i]);
178 }
179 }
180 } else {
181 sc = aStrands;
182 }
183 }
184
185 debugs(16, 4, "strands kid IDs = ");
186 for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
187 debugs(16, 4, iter->kidId);
188 }
189
190 return sc;
191}
192
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
time_t squid_curtime
Definition: stub_libtime.cc:20
#define Must(condition)
Definition: TextException.h:75
int conn
the current server connection FD
Definition: Transport.cc:26
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
static void PutCommonResponseHeaders(HttpReply &, const char *httpOrigin)
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:82
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
HttpReply * BuildHttpReply(void)
Definition: errorpage.cc:1277
static HttpRequest * FromUrlXXX(const char *url, const MasterXaction::Pointer &, const HttpRequestMethod &method=Http::METHOD_GET)
Definition: HttpRequest.cc:528
bool doneAll() const override
whether positive goal has been reached
Definition: Inquirer.cc:146
void start() override
called by AsyncStart; do not call directly
Definition: Inquirer.cc:89
void sendResponse() override
send response to client
Definition: Inquirer.cc:137
Comm::ConnectionPointer conn
HTTP client socket descriptor.
Definition: Inquirer.h:53
void noteCommClosed(const CommCloseCbParams &params)
called when the HTTP client or some external force closed our socket
Definition: Inquirer.cc:116
Ipc::StrandCoords applyQueryParams(const Ipc::StrandCoords &aStrands, const QueryParams &aParams)
Definition: Inquirer.cc:153
void noteWroteHeader(const CommIoCbParams &params)
called when we wrote the response header
Definition: Inquirer.cc:103
Inquirer(Action::Pointer anAction, const Request &aCause, const Ipc::StrandCoords &coords)
Definition: Inquirer.cc:35
void removeCloseHandler()
Definition: Inquirer.cc:61
void cleanup() override
closes our copy of the client HTTP connection socket
Definition: Inquirer.cc:52
void start() override
called by AsyncStart; do not call directly
Definition: Inquirer.cc:70
AsyncCall::Pointer closer
comm_close handler
Definition: Inquirer.h:56
bool doneAll() const override
whether positive goal has been reached
Definition: Inquirer.cc:147
bool aggregate(Ipc::Response::Pointer aResponse) override
perform aggregating of responses and returns true if need to continue
Definition: Inquirer.cc:128
Action::Pointer aggrAction
Definition: Inquirer.h:51
const std::vector< int > & value() const
Definition: IntParam.cc:48
QueryParam::Pointer get(const String &name) const
returns query parameter by name
Definition: QueryParams.cc:23
cache manager request
Definition: Request.h:24
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition: Request.h:35
bool hasAction() const
whether response contain action object
Definition: Response.cc:61
const Action & getAction() const
returns action object
Definition: Response.cc:67
C * getRaw() const
Definition: RefCount.h:89
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:949
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:978
#define MYNAME
Definition: Stream.h:236
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
@ ERR_INVALID_URL
Definition: forward.h:45
CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer)
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
@ OK
Definition: Flag.h:16
@ scNotFound
Definition: StatusCode.h:48
@ scOkay
Definition: StatusCode.h:26
Definition: IpcIoFile.h:24
@ fdnHttpSocket
Definition: FdNotes.h:20
const Comm::ConnectionPointer & ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, FdNoteId noteId)
import socket fd from another strand into our Comm state
Definition: UdsOp.cc:194
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
Cache Manager API.
Definition: Action.h:20
static int sc[16]
Definition: smbdes.c:121
int unsigned int
Definition: stub_fd.cc:19

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors