helper.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 84 Helper process maintenance */
10
11#include "squid.h"
13#include "base/Packable.h"
14#include "base/Raw.h"
15#include "comm.h"
16#include "comm/Connection.h"
17#include "comm/Read.h"
18#include "comm/Write.h"
19#include "debug/Messages.h"
20#include "fd.h"
21#include "fde.h"
22#include "format/Quoting.h"
23#include "helper.h"
24#include "helper/Reply.h"
25#include "helper/Request.h"
26#include "MemBuf.h"
27#include "SquidConfig.h"
28#include "SquidIpc.h"
29#include "SquidMath.h"
30#include "Store.h"
31#include "wordlist.h"
32
33// helper_stateful_server::data uses explicit alloc()/freeOne() */
34#include "mem/Pool.h"
35
36#define HELPER_MAX_ARGS 64
37
39#define MAX_RETRIES 2
40
42const size_t ReadBufSize(32*1024);
43
46static void Enqueue(Helper::Client *, Helper::Xaction *);
51static void helperKickQueue(const Helper::Client::Pointer &);
54static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
55
58
60
61void
63{
64 stats.uses=0;
65 stats.replies=0;
66 stats.pending=0;
67 stats.releases=0;
68 stats.timedout = 0;
69}
70
71void
72Helper::SessionBase::closePipesSafely(const char * const id_name)
73{
74#if _SQUID_WINDOWS_
75 shutdown(writePipe->fd, SD_BOTH);
76#endif
77
78 flags.closing = true;
79 if (readPipe->fd == writePipe->fd)
80 readPipe->fd = -1;
81 else
82 readPipe->close();
83 writePipe->close();
84
85#if _SQUID_WINDOWS_
86 if (hIpc) {
87 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
89 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
90 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
91 }
92 CloseHandle(hIpc);
93 }
94#else
95 (void)id_name;
96#endif
97}
98
99void
101{
102#if _SQUID_WINDOWS_
103 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
104#endif
105
106 flags.closing = true;
107 if (readPipe->fd == writePipe->fd)
108 readPipe->fd = -1;
109 writePipe->close();
110
111#if _SQUID_WINDOWS_
112 if (hIpc) {
113 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
116 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
117 }
118 CloseHandle(hIpc);
119 }
120#else
121 (void)id_name;
122#endif
123}
124
125void
127{
128 while (!requests.empty()) {
129 // XXX: re-schedule these on another helper?
130 const auto r = requests.front();
131 requests.pop_front();
132 r->reply.result = Helper::Unknown;
133 client.callBack(*r);
134 delete r;
135 }
136}
137
139{
140 if (rbuf) {
141 memFreeBuf(rbuf_sz, rbuf);
142 rbuf = nullptr;
143 }
144}
145
147{
148 wqueue->clean();
149 delete wqueue;
150
151 if (writebuf) {
152 writebuf->clean();
153 delete writebuf;
154 writebuf = nullptr;
155 }
156
157 if (Comm::IsConnOpen(writePipe))
158 closeWritePipeSafely(parent->id_name);
159
160 dlinkDelete(&link, &parent->servers);
161
162 assert(parent->childs.n_running > 0);
163 -- parent->childs.n_running;
164
165 assert(requests.empty());
166}
167
168void
170{
172 requestsIndex.clear();
173}
174
176{
177 /* TODO: walk the local queue of requests and carry them all out */
178 if (Comm::IsConnOpen(writePipe))
179 closeWritePipeSafely(parent->id_name);
180
181 parent->cancelReservation(reservationId);
182
183 dlinkDelete(&link, &parent->servers);
184
185 assert(parent->childs.n_running > 0);
186 -- parent->childs.n_running;
187
188 assert(requests.empty());
189}
190
191void
193{
194 char *s;
195 char *progname;
196 char *shortname;
197 char *procname;
198 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
199 char fd_note_buf[FD_DESC_SZ];
200 int nargs = 0;
201 int k;
202 pid_t pid;
203 int rfd;
204 int wfd;
205 void * hIpc;
206 wordlist *w;
207 // Helps reducing diff. TODO: remove
208 const auto hlp = this;
209
210 if (hlp->cmdline == nullptr)
211 return;
212
213 progname = hlp->cmdline->key;
214
215 if ((s = strrchr(progname, '/')))
216 shortname = xstrdup(s + 1);
217 else
218 shortname = xstrdup(progname);
219
220 /* figure out how many new child are actually needed. */
221 int need_new = hlp->childs.needNew();
222
223 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
224
225 if (need_new < 1) {
226 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
227 }
228
229 procname = (char *)xmalloc(strlen(shortname) + 3);
230
231 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
232
233 args[nargs] = procname;
234 ++nargs;
235
236 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
237 args[nargs] = w->key;
238 ++nargs;
239 }
240
241 args[nargs] = nullptr;
242 ++nargs;
243
244 assert(nargs <= HELPER_MAX_ARGS);
245
246 int successfullyStarted = 0;
247
248 for (k = 0; k < need_new; ++k) {
250 rfd = wfd = -1;
251 pid = ipcCreate(hlp->ipc_type,
252 progname,
253 args,
254 shortname,
255 hlp->addr,
256 &rfd,
257 &wfd,
258 &hIpc);
259
260 if (pid < 0) {
261 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
262 continue;
263 }
264
265 ++successfullyStarted;
266 ++ hlp->childs.n_running;
267 ++ hlp->childs.n_active;
268 const auto srv = new Helper::Session;
269 srv->hIpc = hIpc;
270 srv->pid = pid;
271 srv->initStats();
272 srv->addr = hlp->addr;
273 srv->readPipe = new Comm::Connection;
274 srv->readPipe->fd = rfd;
275 srv->writePipe = new Comm::Connection;
276 srv->writePipe->fd = wfd;
277 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
278 srv->wqueue = new MemBuf;
279 srv->roffset = 0;
280 srv->nextRequestId = 0;
281 srv->replyXaction = nullptr;
282 srv->ignoreToEom = false;
283 srv->parent = hlp;
284 dlinkAddTail(srv, &srv->link, &hlp->servers);
285
286 if (rfd == wfd) {
287 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
288 fd_note(rfd, fd_note_buf);
289 } else {
290 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
291 fd_note(rfd, fd_note_buf);
292 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
293 fd_note(wfd, fd_note_buf);
294 }
295
297
298 if (wfd != rfd)
300
301 AsyncCall::Pointer closeCall = asyncCall(5,4, "Helper::Session::HelperServerClosed", cbdataDialer(Helper::Session::HelperServerClosed, srv));
302 comm_add_close_handler(rfd, closeCall);
303
304 if (hlp->timeout && hlp->childs.concurrency) {
305 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
307 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
308 }
309
310 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
312 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
313 }
314
315 // Call handleFewerServers() before hlp->last_restart is updated because
316 // that method uses last_restart to measure the delay since previous start.
317 // TODO: Refactor last_restart code to measure failure frequency rather than
318 // detecting a helper #X failure that is being close to the helper #Y start.
319 if (successfullyStarted < need_new)
320 hlp->handleFewerServers(false);
321
322 hlp->last_restart = squid_curtime;
323 safe_free(shortname);
324 safe_free(procname);
325 helperKickQueue(hlp);
326}
327
328void
330{
331 char *shortname;
332 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
333 char fd_note_buf[FD_DESC_SZ];
334 int nargs = 0;
335 // Helps reducing diff. TODO: remove
336 const auto hlp = this;
337
338 if (hlp->cmdline == nullptr)
339 return;
340
341 if (hlp->childs.concurrency)
342 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
343
344 char *progname = hlp->cmdline->key;
345
346 char *s;
347 if ((s = strrchr(progname, '/')))
348 shortname = xstrdup(s + 1);
349 else
350 shortname = xstrdup(progname);
351
352 /* figure out haw mant new helpers are needed. */
353 int need_new = hlp->childs.needNew();
354
355 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
356
357 if (need_new < 1) {
358 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
359 }
360
361 char *procname = (char *)xmalloc(strlen(shortname) + 3);
362
363 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
364
365 args[nargs] = procname;
366 ++nargs;
367
368 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
369 args[nargs] = w->key;
370 ++nargs;
371 }
372
373 args[nargs] = nullptr;
374 ++nargs;
375
376 assert(nargs <= HELPER_MAX_ARGS);
377
378 int successfullyStarted = 0;
379
380 for (int k = 0; k < need_new; ++k) {
382 int rfd = -1;
383 int wfd = -1;
384 void * hIpc;
385 pid_t pid = ipcCreate(hlp->ipc_type,
386 progname,
387 args,
388 shortname,
389 hlp->addr,
390 &rfd,
391 &wfd,
392 &hIpc);
393
394 if (pid < 0) {
395 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
396 continue;
397 }
398
399 ++successfullyStarted;
400 ++ hlp->childs.n_running;
401 ++ hlp->childs.n_active;
403 srv->hIpc = hIpc;
404 srv->pid = pid;
405 srv->initStats();
406 srv->addr = hlp->addr;
407 srv->readPipe = new Comm::Connection;
408 srv->readPipe->fd = rfd;
409 srv->writePipe = new Comm::Connection;
410 srv->writePipe->fd = wfd;
411 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
412 srv->roffset = 0;
413 srv->parent = hlp;
414 srv->reservationStart = 0;
415
416 dlinkAddTail(srv, &srv->link, &hlp->servers);
417
418 if (rfd == wfd) {
419 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
420 fd_note(rfd, fd_note_buf);
421 } else {
422 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
423 fd_note(rfd, fd_note_buf);
424 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
425 fd_note(wfd, fd_note_buf);
426 }
427
429
430 if (wfd != rfd)
432
433 AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
434 comm_add_close_handler(rfd, closeCall);
435
436 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
438 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
439 }
440
441 // Call handleFewerServers() before hlp->last_restart is updated because
442 // that method uses last_restart to measure the delay since previous start.
443 // TODO: Refactor last_restart code to measure failure frequency rather than
444 // detecting a helper #X failure that is being close to the helper #Y start.
445 if (successfullyStarted < need_new)
446 hlp->handleFewerServers(false);
447
448 hlp->last_restart = squid_curtime;
449 safe_free(shortname);
450 safe_free(procname);
452}
453
454void
456{
457 if (const auto srv = GetFirstAvailable(this))
458 helperDispatch(srv, r);
459 else
460 Enqueue(this, r);
461
462 syncQueueStats();
463}
464
466static void
467SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
468{
469 auto result = Helper::Error;
470 if (!hlp) {
471 debugs(84, 3, "no helper");
472 result = Helper::Unknown;
473 }
474 // else pretend the helper has responded with ERR
475
476 callback(data, Helper::Reply(result));
477}
478
479void
480helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
481{
482 if (!hlp || !hlp->trySubmit(buf, callback, data))
483 SubmissionFailure(hlp, callback, data);
484}
485
487bool
489 return stats.queue_size >= static_cast<int>(childs.queue_size);
490}
491
492bool
494 return stats.queue_size > static_cast<int>(childs.queue_size);
495}
496
498void
500{
501 if (overloaded()) {
502 if (overloadStart) {
503 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
504 } else {
505 overloadStart = squid_curtime;
506 debugs(84, 3, id_name << " became overloaded");
507 }
508 } else {
509 if (overloadStart) {
510 debugs(84, 5, id_name << " is no longer overloaded");
511 if (droppedRequests) {
512 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
513 " is no longer overloaded after dropping " << droppedRequests <<
514 " requests in " << (squid_curtime - overloadStart) << " seconds");
515 droppedRequests = 0;
516 }
517 overloadStart = 0;
518 }
519 }
520}
521
525bool
527{
528 // re-sync for the configuration may have changed since the last submission
529 syncQueueStats();
530
531 // Nothing special to do if the new request does not overload (i.e., the
532 // queue is not even full yet) or only _starts_ overloading this helper
533 // (i.e., the queue is currently at its limit).
534 if (!overloaded())
535 return true;
536
537 if (squid_curtime - overloadStart <= 180)
538 return true; // also OK: overload has not persisted long enough to panic
539
540 if (childs.onPersistentOverload == ChildConfig::actDie)
541 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
542
543 if (!droppedRequests) {
544 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
545 id_name << " helper configured with on-persistent-overload=err");
546 }
547 ++droppedRequests;
548 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
549 return false;
550}
551
552bool
553Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
554{
555 if (!prepSubmit())
556 return false; // request was dropped
557
558 submit(buf, callback, data); // will send or queue
559 return true; // request submitted or queued
560}
561
563void
564Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
565{
566 const auto r = new Xaction(callback, data, buf);
567 submitRequest(r);
568 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
569}
570
571void
573{
574 const auto callback = r.request.callback;
575 Assure(callback);
576
577 r.request.callback = nullptr;
578 void *cbdata = nullptr;
580 callback(cbdata, r.reply);
581}
582
585void
586helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
587{
588 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
589 SubmissionFailure(hlp, callback, data);
590}
591
593bool
594statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
595{
596 if (!prepSubmit())
597 return false; // request was dropped
598
599 submit(buf, callback, data, reservation); // will send or queue
600 return true; // request submitted or queued
601}
602
603void
605{
606 // clear any old reservation
607 if (srv->reserved()) {
608 reservations.erase(srv->reservationId);
609 srv->clearReservation();
610 }
611
612 srv->reserve();
613 reservations.insert(Reservations::value_type(srv->reservationId, srv));
614}
615
616void
618{
619 const auto it = reservations.find(reservation);
620 if (it == reservations.end())
621 return;
622
623 helper_stateful_server *srv = it->second;
624 reservations.erase(it);
625 srv->clearReservation();
626
627 // schedule a queue kick
628 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
629 ScheduleCallHere(call);
630}
631
634{
635 const auto it = reservations.find(reservation);
636 if (it == reservations.end())
637 return nullptr;
638 return it->second;
639}
640
641void
643{
644 assert(!reservationId);
645 reservationStart = squid_curtime;
646 reservationId = Helper::ReservationId::Next();
647 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
648}
649
650void
652{
653 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
654 if (!reservationId)
655 return;
656
657 ++stats.releases;
658
659 reservationId.clear();
660 reservationStart = 0;
661}
662
663void
664statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
665{
666 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
667
668 if (buf && reservation) {
669 debugs(84, 5, reservation);
670 helper_stateful_server *lastServer = findServer(reservation);
671 if (!lastServer) {
672 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
674 callBack(*r);
675 delete r;
676 return;
677 }
678 debugs(84, 5, "StatefulSubmit dispatching");
679 helperStatefulDispatch(lastServer, r);
680 } else {
682 if ((srv = StatefulGetFirstAvailable(this))) {
683 reserveServer(srv);
685 } else
686 StatefulEnqueue(this, r);
687 }
688
689 debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
690 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
691
692 syncQueueStats();
693}
694
695void
696Helper::Client::packStatsInto(Packable * const p, const char * const label) const
697{
698 if (label)
699 p->appendf("%s:\n", label);
700
701 p->appendf(" program: %s\n", cmdline->key);
702 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
703 p->appendf(" requests sent: %d\n", stats.requests);
704 p->appendf(" replies received: %d\n", stats.replies);
705 p->appendf(" requests timedout: %d\n", stats.timedout);
706 p->appendf(" queue length: %d\n", stats.queue_size);
707 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
708 p->append("\n",1);
709 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
710 "ID #",
711 "FD",
712 "PID",
713 "# Requests",
714 "# Replies",
715 "# Timed-out",
716 "Flags",
717 "Time",
718 "Offset",
719 "Request");
720
721 for (dlink_node *link = servers.head; link; link = link->next) {
722 const auto srv = static_cast<SessionBase *>(link->data);
723 assert(srv);
724 const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
725 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
726 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
727 srv->index.value,
728 srv->readPipe->fd,
729 srv->pid,
730 srv->stats.uses,
731 srv->stats.replies,
732 srv->stats.timedout,
733 srv->stats.pending ? 'B' : ' ',
734 srv->flags.writing ? 'W' : ' ',
735 srv->flags.closing ? 'C' : ' ',
736 srv->reserved() ? 'R' : ' ',
737 srv->flags.shutdown ? 'S' : ' ',
738 xaction && xaction->request.placeholder ? 'P' : ' ',
739 tt < 0.0 ? 0.0 : tt,
740 (int) srv->roffset,
741 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
742 }
743
744 p->append("\nFlags key:\n"
745 " B\tBUSY\n"
746 " W\tWRITING\n"
747 " C\tCLOSING\n"
748 " R\tRESERVED\n"
749 " S\tSHUTDOWN PENDING\n"
750 " P\tPLACEHOLDER\n", 101);
751}
752
753bool
755 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
756}
757
759Helper::Client::Make(const char * const name)
760{
761 return new Client(name);
762}
763
765statefulhelper::Make(const char *name)
766{
767 return new statefulhelper(name);
768}
769
770void
772{
773 dlink_node *link = hlp->servers.head;
774
775 while (link) {
776 const auto srv = static_cast<Helper::Session *>(link->data);
777 link = link->next;
778
779 if (srv->flags.shutdown) {
780 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
781 continue;
782 }
783
784 assert(hlp->childs.n_active > 0);
785 -- hlp->childs.n_active;
786 srv->flags.shutdown = true; /* request it to shut itself down */
787
788 if (srv->flags.closing) {
789 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
790 continue;
791 }
792
793 if (srv->stats.pending) {
794 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
795 continue;
796 }
797
798 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
799 /* the rest of the details is dealt with in the helperServerFree
800 * close handler
801 */
802 srv->closePipesSafely(hlp->id_name);
803 }
804}
805
806void
808{
809 dlink_node *link = hlp->servers.head;
811
812 while (link) {
813 srv = (helper_stateful_server *)link->data;
814 link = link->next;
815
816 if (srv->flags.shutdown) {
817 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
818 continue;
819 }
820
821 assert(hlp->childs.n_active > 0);
822 -- hlp->childs.n_active;
823 srv->flags.shutdown = true; /* request it to shut itself down */
824
825 if (srv->stats.pending) {
826 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
827 continue;
828 }
829
830 if (srv->flags.closing) {
831 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
832 continue;
833 }
834
835 if (srv->reserved()) {
836 if (shutting_down) {
837 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
838 } else {
839 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
840 continue;
841 }
842 }
843
844 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
845
846 /* the rest of the details is dealt with in the helperStatefulServerFree
847 * close handler
848 */
849 srv->closePipesSafely(hlp->id_name);
850 }
851}
852
854{
855 /* note, don't free id_name, it probably points to static memory */
856
857 // TODO: if the queue is not empty it will leak Helper::Request's
858 if (!queue.empty())
859 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
860}
861
862void
863Helper::Client::handleKilledServer(SessionBase * const srv, bool &needsNewServers)
864{
865 needsNewServers = false;
866 if (!srv->flags.shutdown) {
867 assert(childs.n_active > 0);
868 --childs.n_active;
869 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
870
871 handleFewerServers(srv->stats.replies >= 1);
872
873 if (childs.needNew() > 0) {
874 srv->flags.shutdown = true;
875 needsNewServers = true;
876 }
877 }
878}
879
880void
882{
883 const auto needNew = childs.needNew();
884
885 if (!needNew)
886 return; // some server(s) have died, but we still have enough
887
888 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
889 Debug::Extra << "active processes: " << childs.n_active <<
890 Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
891
892 if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
893 if (madeProgress)
894 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
895 else
896 fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
897 }
898}
899
900void
902{
903 bool needsNewServers = false;
904 handleKilledServer(&session, needsNewServers);
905 if (needsNewServers) {
906 debugs(80, DBG_IMPORTANT, "Starting new helpers");
907 openSessions();
908 }
909}
910
911void
913{
914 srv->parent->sessionClosed(*srv);
915 srv->dropQueued(*srv->parent);
916 delete srv;
917}
918
919// XXX: Essentially duplicates Helper::Session::HelperServerClosed() until we add SessionBase::helper().
920void
922{
923 srv->parent->sessionClosed(*srv);
924 srv->dropQueued(*srv->parent);
925 delete srv;
926}
927
929Helper::Session::popRequest(const int request_number)
930{
931 Xaction *r = nullptr;
932 if (parent->childs.concurrency) {
933 // If concurrency supported retrieve request from ID
934 const auto it = requestsIndex.find(request_number);
935 if (it != requestsIndex.end()) {
936 r = *(it->second);
937 requests.erase(it->second);
938 requestsIndex.erase(it);
939 }
940 } else if(!requests.empty()) {
941 // Else get the first request from queue, if any
942 r = requests.front();
943 requests.pop_front();
944 }
945
946 return r;
947}
948
950static void
951helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
952{
953 if (Helper::Xaction *r = srv->replyXaction) {
954 const bool hasSpace = r->reply.accumulate(msg, msgSize);
955 if (!hasSpace) {
956 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
957 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
958 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
959 srv->closePipesSafely(hlp->id_name);
960 return;
961 }
962
963 if (!msgEnd)
964 return; // We are waiting for more data.
965
966 bool retry = false;
967 if (cbdataReferenceValid(r->request.data)) {
968 r->reply.finalize();
969 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
970 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
971 retry = true;
972 } else {
973 hlp->callBack(*r);
974 }
975 }
976
977 -- srv->stats.pending;
978 ++ srv->stats.replies;
979
980 ++ hlp->stats.replies;
981
983
984 srv->dispatch_time = r->request.dispatch_time;
985
986 hlp->stats.avg_svc_time =
988 tvSubMsec(r->request.dispatch_time, current_time),
990
991 // release or re-submit parsedRequestXaction object
992 srv->replyXaction = nullptr;
993 if (retry) {
994 ++r->request.retries;
995 hlp->submitRequest(r);
996 } else
997 delete r;
998 }
999
1000 if (hlp->timeout && hlp->childs.concurrency)
1002
1003 if (!srv->flags.shutdown) {
1004 helperKickQueue(hlp);
1005 } else if (!srv->flags.closing && !srv->stats.pending) {
1007 }
1008}
1009
1010static void
1011helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1012{
1013 const auto srv = static_cast<Helper::Session *>(data);
1014 const auto hlp = srv->parent;
1016
1017 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1018
1019 if (flag == Comm::ERR_CLOSING) {
1020 return;
1021 }
1022
1023 assert(conn->fd == srv->readPipe->fd);
1024
1025 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
1026
1027 if (flag != Comm::OK || len == 0) {
1028 srv->closePipesSafely(hlp->id_name);
1029 return;
1030 }
1031
1032 srv->roffset += len;
1033 srv->rbuf[srv->roffset] = '\0';
1034 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1035
1036 if (!srv->stats.pending && !srv->stats.timedout) {
1037 /* someone spoke without being spoken to */
1038 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1039 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1040 " bytes '" << srv->rbuf << "'");
1041
1042 srv->roffset = 0;
1043 srv->rbuf[0] = '\0';
1044 srv->closePipesSafely(hlp->id_name);
1045 return;
1046 }
1047
1048 bool needsMore = false;
1049 char *msg = srv->rbuf;
1050 while (*msg && !needsMore) {
1051 int skip = 0;
1052 char *eom = strchr(msg, hlp->eom);
1053 if (eom) {
1054 skip = 1;
1055 debugs(84, 3, "helperHandleRead: end of reply found");
1056 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1057 *eom = '\0';
1058 // rewind to the \r octet which is the real terminal now
1059 // and remember that we have to skip forward 2 places now.
1060 skip = 2;
1061 --eom;
1062 }
1063 *eom = '\0';
1064 }
1065
1066 if (!srv->ignoreToEom && !srv->replyXaction) {
1067 int i = 0;
1068 if (hlp->childs.concurrency) {
1069 char *e = nullptr;
1070 i = strtol(msg, &e, 10);
1071 // Do we need to check for e == msg? Means wrong response from helper.
1072 // Will be dropped as "unexpected reply on channel 0"
1073 needsMore = !(xisspace(*e) || (eom && e == eom));
1074 if (!needsMore) {
1075 msg = e;
1076 while (*msg && xisspace(*msg))
1077 ++msg;
1078 } // else not enough data to compute request number
1079 }
1080 if (!(srv->replyXaction = srv->popRequest(i))) {
1081 if (srv->stats.timedout) {
1082 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1083 } else {
1084 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1085 i << " from " << hlp->id_name << " #" << srv->index <<
1086 " '" << srv->rbuf << "'");
1087 }
1088 srv->ignoreToEom = true;
1089 }
1090 } // else we need to just append reply data to the current Xaction
1091
1092 if (!needsMore) {
1093 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1094 assert(msgSize <= srv->rbuf_sz);
1095 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1096 msg += msgSize + skip;
1097 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1098
1099 // The next message should not ignored.
1100 if (eom && srv->ignoreToEom)
1101 srv->ignoreToEom = false;
1102 } else
1103 assert(skip == 0 && eom == nullptr);
1104 }
1105
1106 if (needsMore) {
1107 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1108 assert(msgSize <= srv->rbuf_sz);
1109 memmove(srv->rbuf, msg, msgSize);
1110 srv->roffset = msgSize;
1111 srv->rbuf[srv->roffset] = '\0';
1112 } else {
1113 // All of the responses parsed and msg points at the end of read data
1114 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1115 srv->roffset = 0;
1116 }
1117
1118 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1119 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1120 assert(spaceSize >= 0);
1121
1122 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1124 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1125 }
1126}
1127
1128static void
1129helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1130{
1131 char *t = nullptr;
1133 const auto hlp = srv->parent;
1135
1136 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1137
1138 if (flag == Comm::ERR_CLOSING) {
1139 return;
1140 }
1141
1142 assert(conn->fd == srv->readPipe->fd);
1143
1144 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1145 hlp->id_name << " #" << srv->index);
1146
1147 if (flag != Comm::OK || len == 0) {
1148 srv->closePipesSafely(hlp->id_name);
1149 return;
1150 }
1151
1152 srv->roffset += len;
1153 srv->rbuf[srv->roffset] = '\0';
1154 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1155
1156 if (srv->requests.empty()) {
1157 /* someone spoke without being spoken to */
1158 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1159 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1160 " bytes '" << srv->rbuf << "'");
1161
1162 srv->roffset = 0;
1163 srv->closePipesSafely(hlp->id_name);
1164 return;
1165 }
1166
1167 if ((t = strchr(srv->rbuf, hlp->eom))) {
1168 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1169
1170 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1171 *t = '\0';
1172 // rewind to the \r octet which is the real terminal now
1173 --t;
1174 }
1175
1176 *t = '\0';
1177 }
1178
1179 const auto r = srv->requests.front();
1180
1181 if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1182 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1183 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1184 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1185 srv->closePipesSafely(hlp->id_name);
1186 return;
1187 }
1194 srv->roffset = 0;
1195
1196 if (t) {
1197 /* end of reply found */
1198 srv->requests.pop_front(); // we already have it in 'r'
1199 int called = 1;
1200
1201 if (cbdataReferenceValid(r->request.data)) {
1202 r->reply.finalize();
1203 r->reply.reservationId = srv->reservationId;
1204 hlp->callBack(*r);
1205 } else {
1206 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1207 called = 0;
1208 }
1209
1210 delete r;
1211
1212 -- srv->stats.pending;
1213 ++ srv->stats.replies;
1214
1215 ++ hlp->stats.replies;
1217 hlp->stats.avg_svc_time =
1218 Math::intAverage(hlp->stats.avg_svc_time,
1220 hlp->stats.replies, REDIRECT_AV_FACTOR);
1221
1222 if (called)
1224 else
1225 hlp->cancelReservation(srv->reservationId);
1226 }
1227
1228 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1229 int spaceSize = srv->rbuf_sz - 1;
1230
1231 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1233 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1234 }
1235}
1236
1238static void
1240{
1241 hlp->queue.push(r);
1242 ++ hlp->stats.queue_size;
1243
1244 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1245 if (hlp->childs.needNew() > 0) {
1246 hlp->openSessions();
1247 return;
1248 }
1249
1250 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1251 return;
1252
1253 if (squid_curtime - hlp->last_queue_warn < 600)
1254 return;
1255
1257 return;
1258
1260
1261 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1262 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1263 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1264}
1265
1266static void
1268{
1269 hlp->queue.push(r);
1270 ++ hlp->stats.queue_size;
1271
1272 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1273 if (hlp->childs.needNew() > 0) {
1274 hlp->openSessions();
1275 return;
1276 }
1277
1278 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1279 return;
1280
1281 if (squid_curtime - hlp->last_queue_warn < 600)
1282 return;
1283
1285 return;
1286
1288
1289 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1290 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1291 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1292}
1293
1296{
1297 if (queue.empty())
1298 return nullptr;
1299
1300 auto *r = queue.front();
1301 queue.pop();
1302 --stats.queue_size;
1303 return r;
1304}
1305
1306static Helper::Session *
1308{
1309 dlink_node *n;
1310 Helper::Session *selected = nullptr;
1311 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1312
1313 if (hlp->childs.n_running == 0)
1314 return nullptr;
1315
1316 /* Find "least" loaded helper (approx) */
1317 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1318 const auto srv = static_cast<Helper::Session *>(n->data);
1319
1320 if (selected && selected->stats.pending <= srv->stats.pending)
1321 continue;
1322
1323 if (srv->flags.shutdown)
1324 continue;
1325
1326 if (!srv->stats.pending)
1327 return srv;
1328
1329 if (selected) {
1330 selected = srv;
1331 break;
1332 }
1333
1334 selected = srv;
1335 }
1336
1337 if (!selected) {
1338 debugs(84, 5, "GetFirstAvailable: None available.");
1339 return nullptr;
1340 }
1341
1342 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1343 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1344 return nullptr;
1345 }
1346
1347 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1348 return selected;
1349}
1350
1353{
1354 dlink_node *n;
1355 helper_stateful_server *srv = nullptr;
1356 helper_stateful_server *oldestReservedServer = nullptr;
1357 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1358
1359 if (hlp->childs.n_running == 0)
1360 return nullptr;
1361
1362 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1363 srv = (helper_stateful_server *)n->data;
1364
1365 if (srv->stats.pending)
1366 continue;
1367
1368 if (srv->reserved()) {
1370 if (!oldestReservedServer)
1371 oldestReservedServer = srv;
1372 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1373 oldestReservedServer = srv;
1374 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1375 }
1376 continue;
1377 }
1378
1379 if (srv->flags.shutdown)
1380 continue;
1381
1382 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1383 return srv;
1384 }
1385
1386 if (oldestReservedServer) {
1387 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1388 return oldestReservedServer;
1389 }
1390
1391 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1392 return nullptr;
1393}
1394
1395static void
1396helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1397{
1398 const auto srv = static_cast<Helper::Session *>(data);
1399
1400 srv->writebuf->clean();
1401 delete srv->writebuf;
1402 srv->writebuf = nullptr;
1403 srv->flags.writing = false;
1404
1405 if (flag != Comm::OK) {
1406 /* Helper server has crashed */
1407 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1408 return;
1409 }
1410
1411 if (!srv->wqueue->isNull()) {
1412 srv->writebuf = srv->wqueue;
1413 srv->wqueue = new MemBuf;
1414 srv->flags.writing = true;
1415 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1417 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1418 }
1419}
1420
1421static void
1423{
1424 const auto hlp = srv->parent;
1425 const uint64_t reqId = ++srv->nextRequestId;
1426
1428 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1429 delete r;
1430 return;
1431 }
1432
1433 r->request.Id = reqId;
1434 const auto it = srv->requests.insert(srv->requests.end(), r);
1436
1437 if (srv->wqueue->isNull())
1438 srv->wqueue->init();
1439
1440 if (hlp->childs.concurrency) {
1441 srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
1442 assert(srv->requestsIndex.size() == srv->requests.size());
1443 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1444 } else
1445 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1446
1447 if (!srv->flags.writing) {
1448 assert(nullptr == srv->writebuf);
1449 srv->writebuf = srv->wqueue;
1450 srv->wqueue = new MemBuf;
1451 srv->flags.writing = true;
1452 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1454 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1455 }
1456
1457 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1458
1459 ++ srv->stats.uses;
1460 ++ srv->stats.pending;
1461 ++ hlp->stats.requests;
1462}
1463
1464static void
1466{}
1467
1468static void
1470{
1471 const auto hlp = srv->parent;
1472
1474 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1475 delete r;
1476 hlp->cancelReservation(srv->reservationId);
1477 return;
1478 }
1479
1480 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1481
1482 assert(srv->reservationId);
1484
1485 if (r->request.placeholder == 1) {
1486 /* a callback is needed before this request can _use_ a helper. */
1487 /* we don't care about releasing this helper. The request NEVER
1488 * gets to the helper. So we throw away the return code */
1490 hlp->callBack(*r);
1491 /* throw away the placeholder */
1492 delete r;
1493 /* and push the queue. Note that the callback may have submitted a new
1494 * request to the helper which is why we test for the request */
1495
1496 if (!srv->requests.size())
1498
1499 return;
1500 }
1501
1502 srv->requests.push_back(r);
1504 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1506 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
1507 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1508 hlp->id_name << " #" << srv->index << ", " <<
1509 (int) strlen(r->request.buf) << " bytes");
1510
1511 ++ srv->stats.uses;
1512 ++ srv->stats.pending;
1513 ++ hlp->stats.requests;
1514}
1515
1516static void
1518{
1519 Helper::Xaction *r = nullptr;
1520 Helper::Session *srv = nullptr;
1521
1522 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1523 helperDispatch(srv, r);
1524}
1525
1526static void
1528{
1529 Helper::Xaction *r;
1531 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1532 debugs(84, 5, "found srv-" << srv->index);
1533 hlp->reserveServer(srv);
1534 helperStatefulDispatch(srv, r);
1535 }
1536}
1537
1538static void
1540{
1541 if (!srv->flags.shutdown) {
1543 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1545 return;
1546 }
1547}
1548
1549void
1551{
1552 assert(parent->childs.concurrency);
1553 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1554 const auto r = requests.front();
1555 RequestIndex::iterator it;
1556 it = requestsIndex.find(r->request.Id);
1557 assert(it != requestsIndex.end());
1558 requestsIndex.erase(it);
1559 requests.pop_front();
1560 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1561 bool retried = false;
1562 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1563 debugs(84, 2, "Retry request " << r->request.Id);
1564 ++r->request.retries;
1565 parent->submitRequest(r);
1566 retried = true;
1567 } else if (cbdataReferenceValid(r->request.data)) {
1568 if (!parent->onTimedOutResponse.isEmpty()) {
1569 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1570 r->reply.finalize();
1571 else
1572 r->reply.result = Helper::TimedOut;
1573 parent->callBack(*r);
1574 } else {
1575 r->reply.result = Helper::TimedOut;
1576 parent->callBack(*r);
1577 }
1578 }
1579 --stats.pending;
1580 ++stats.timedout;
1581 ++parent->stats.timedout;
1582 if (!retried)
1583 delete r;
1584 }
1585}
1586
1587void
1589{
1590 debugs(26, 3, io.conn);
1591 const auto srv = static_cast<Session *>(io.data);
1592
1593 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1594
1595 debugs(84, 3, io.conn << " establish a new timeout");
1596 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1598
1599 const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1600 const time_t minimumNewTimeout = 1; // second
1601 const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
1602
1603 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1604}
1605
#define Assure(condition)
Definition: Assure.h:35
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:312
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:34
static void * hIpc
Definition: IcmpSquid.cc:33
static pid_t pid
Definition: IcmpSquid.cc:34
time_t squid_curtime
Definition: stub_libtime.cc:20
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:59
int conn
the current server connection FD
Definition: Transport.cc:26
#define assert(EX)
Definition: assert.h:17
char progname[]
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:265
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:320
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:239
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
Definition: cbdata.h:328
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
static std::ostream & Extra(std::ostream &)
Definition: debug.cc:1313
unsigned int n_max
Definition: ChildConfig.h:48
unsigned int n_running
Definition: ChildConfig.h:80
unsigned int n_active
Definition: ChildConfig.h:86
unsigned int queue_size
Definition: ChildConfig.h:91
time_t reservationTimeout
older stateful helper server reservations may be forgotten
Definition: ChildConfig.h:109
unsigned int concurrency
Definition: ChildConfig.h:72
@ actDie
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
int needNew() const
Definition: ChildConfig.cc:59
time_t timeout
Requests timeout.
Definition: helper.h:125
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:488
dlink_list servers
Definition: helper.h:115
void sessionClosed(SessionBase &)
handles exited helper process
Definition: helper.cc:901
const char * id_name
Definition: helper.h:117
Xaction * nextRequest()
Definition: helper.cc:1295
void handleKilledServer(SessionBase *, bool &needsNewServers)
Definition: helper.cc:863
bool willOverload() const
Definition: helper.cc:754
virtual ~Client()
Definition: helper.cc:853
void packStatsInto(Packable *p, const char *label=nullptr) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:696
void handleFewerServers(bool madeProgress)
Definition: helper.cc:881
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:564
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:553
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:499
std::queue< Xaction * > queue
Definition: helper.h:116
bool prepSubmit()
Definition: helper.cc:526
time_t last_queue_warn
Definition: helper.h:123
void submitRequest(Xaction *)
Definition: helper.cc:455
ChildConfig childs
Configuration settings for number running.
Definition: helper.h:118
bool overloaded() const
Definition: helper.cc:493
void callBack(Xaction &)
sends transaction response to the transaction initiator
Definition: helper.cc:572
bool retryTimedOut
Whether the timed-out requests must retried.
Definition: helper.h:126
struct Helper::Client::_stats stats
static Pointer Make(const char *name)
Definition: helper.cc:759
virtual void openSessions()
Definition: helper.cc:192
Helper::ResultCode result
The helper response 'result' field.
Definition: Reply.h:59
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
Definition: Reply.h:65
HLPCB * callback
Definition: Request.h:42
char * buf
Definition: Request.h:41
void * data
Definition: Request.h:43
uint64_t Id
Definition: Request.h:47
struct timeval dispatch_time
Definition: Request.h:46
int placeholder
Definition: Request.h:45
a (temporary) lock on a (stateful) helper channel
Definition: ReservationId.h:18
static ReservationId Next()
represents a single helper process
Definition: helper.h:192
uint64_t replies
Definition: helper.h:249
struct Helper::SessionBase::_helper_flags flags
uint64_t pending
Definition: helper.h:250
dlink_node link
Definition: helper.h:236
Requests requests
requests in order of submission/expiration
Definition: helper.h:245
Ip::Address addr
Definition: helper.h:224
const InstanceId< SessionBase > index
Definition: helper.h:221
void closePipesSafely(const char *name)
Definition: helper.cc:72
struct timeval dispatch_time
Definition: helper.h:233
virtual void dropQueued(Client &)
dequeues and sends an Unknown answer to all queued requests
Definition: helper.cc:126
struct timeval answer_time
Definition: helper.h:234
uint64_t uses
Definition: helper.h:248
void initStats()
Definition: helper.cc:62
struct Helper::SessionBase::@68 stats
~SessionBase() override
Definition: helper.cc:138
Comm::ConnectionPointer readPipe
Definition: helper.h:225
void closeWritePipeSafely(const char *name)
Definition: helper.cc:100
Comm::ConnectionPointer writePipe
Definition: helper.h:226
MemBuf * wqueue
Definition: helper.h:266
static void HelperServerClosed(Session *)
close handler to handle exited server processes
Definition: helper.cc:912
void dropQueued(Client &) override
dequeues and sends an Unknown answer to all queued requests
Definition: helper.cc:169
uint64_t nextRequestId
Definition: helper.h:264
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1550
MemBuf * writebuf
Definition: helper.h:267
Client::Pointer parent
Definition: helper.h:269
Xaction * replyXaction
Definition: helper.h:275
~Session() override
Definition: helper.cc:146
Xaction * popRequest(int requestId)
Definition: helper.cc:929
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:282
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1588
Holds the required data to serve a helper request.
Definition: helper.h:41
Helper::Reply reply
Definition: helper.h:46
Helper::Request request
Definition: helper.h:45
Definition: MemBuf.h:24
void clean()
Definition: MemBuf.cc:110
void append(const char *c, int sz) override
Definition: MemBuf.cc:209
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:93
char * content()
start of the added data
Definition: MemBuf.h:41
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
int isNull() const
Definition: MemBuf.cc:145
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
Definition: Raw.h:21
Definition: cbdata.cc:38
bool reserved() override
whether the server is locked for exclusive use by a client
Definition: helper.h:322
~helper_stateful_server() override
Definition: helper.cc:175
statefulhelper::Pointer parent
Definition: helper.h:327
time_t reservationStart
when the last reservation was made
Definition: helper.h:334
static void HelperServerClosed(helper_stateful_server *srv)
close handler to handle exited server processes
Definition: helper.cc:921
Helper::ReservationId reservationId
"confirmation ID" of the last
Definition: helper.h:333
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:664
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
Definition: helper.cc:633
static Pointer Make(const char *name)
Definition: helper.cc:765
void reserveServer(helper_stateful_server *srv)
reserve the given server
Definition: helper.cc:604
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
Definition: helper.cc:617
void openSessions() override
Definition: helper.cc:329
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
reserved servers indexed by reservation IDs
Definition: helper.cc:594
char * key
Definition: wordlist.h:32
wordlist * next
Definition: wordlist.h:33
int commSetConnTimeout(const Comm::ConnectionPointer &conn, time_t timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:595
int commSetNonBlocking(int fd)
Definition: comm.cc:1066
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:949
A const & max(A const &lhs, A const &rhs)
#define Important(id)
Definition: Messages.h:93
#define DBG_DATA
Definition: Stream.h:40
#define DBG_IMPORTANT
Definition: Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
#define DBG_CRITICAL
Definition: Stream.h:37
#define REDIRECT_AV_FACTOR
Definition: defines.h:53
#define FD_DESC_SZ
Definition: defines.h:32
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
void fd_note(int fd, const char *s)
Definition: fd.cc:216
#define fd_table
Definition: fde.h:189
int shutting_down
int reconfiguring
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:33
void helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:586
const size_t ReadBufSize(32 *1024)
Helpers input buffer size.
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1465
static void helperReturnBuffer(Helper::Session *srv, const Helper::Client::Pointer &hlp, char *const msg, const size_t msgSize, const char *const msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:951
static void helperKickQueue(const Helper::Client::Pointer &)
Definition: helper.cc:1517
InstanceIdDefinitions(Helper::SessionBase, "Hlpr")
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1539
static void helperDispatch(Helper::Session *, Helper::Xaction *)
Definition: helper.cc:1422
static Helper::Session * GetFirstAvailable(const Helper::Client::Pointer &)
Definition: helper.cc:1307
static helper_stateful_server * StatefulGetFirstAvailable(const statefulhelper::Pointer &)
Definition: helper.cc:1352
#define HELPER_MAX_ARGS
Definition: helper.cc:36
static void helperStatefulKickQueue(const statefulhelper::Pointer &)
Definition: helper.cc:1527
static void Enqueue(Helper::Client *, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1239
static void SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:467
void helperShutdown(const Helper::Client::Pointer &hlp)
Definition: helper.cc:771
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1267
void helperStatefulShutdown(const statefulhelper::Pointer &hlp)
Definition: helper.cc:807
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1469
static IOCB helperStatefulHandleRead
Definition: helper.cc:45
void helperSubmit(const Helper::Client::Pointer &hlp, const char *const buf, HLPCB *const callback, void *const data)
Definition: helper.cc:480
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1396
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:39
static IOCB helperHandleRead
Definition: helper.cc:44
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:65
void memFreeBuf(size_t size, void *)
Definition: minimal.cc:90
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
Flag
Definition: Flag.h:15
@ OK
Definition: Flag.h:16
@ ERR_CLOSING
Definition: Flag.h:24
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
helper protocol primitives
Definition: helper.h:39
@ Unknown
Definition: ResultCode.h:17
@ BrokenHelper
Definition: ResultCode.h:20
@ Error
Definition: ResultCode.h:19
@ TimedOut
Definition: ResultCode.h:21
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
class Ping::pingStats_ stats
SSL_SESSION Session
Definition: Session.h:47
SSL Connection
Definition: Session.h:45
#define xstrdup
#define xmalloc
int unsigned int
Definition: stub_fd.cc:19
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:17
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
#define PRIu64
Definition: types.h:114
#define safe_free(x)
Definition: xalloc.h:73
#define xisspace(x)
Definition: xis.h:15

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors