helper.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 84 Helper process maintenance */
10
11#include "squid.h"
13#include "base/Packable.h"
14#include "base/Raw.h"
15#include "comm.h"
16#include "comm/Connection.h"
17#include "comm/Read.h"
18#include "comm/Write.h"
19#include "debug/Messages.h"
20#include "fd.h"
21#include "fde.h"
22#include "format/Quoting.h"
23#include "helper.h"
24#include "helper/Reply.h"
25#include "helper/Request.h"
26#include "MemBuf.h"
27#include "SquidConfig.h"
28#include "SquidIpc.h"
29#include "SquidMath.h"
30#include "Store.h"
31#include "wordlist.h"
32
33// helper_stateful_server::data uses explicit alloc()/freeOne() */
34#include "mem/Pool.h"
35
36#define HELPER_MAX_ARGS 64
37
39#define MAX_RETRIES 2
40
42const size_t ReadBufSize(32*1024);
43
46static void Enqueue(helper * hlp, Helper::Xaction *);
47static helper_server *GetFirstAvailable(const helper * hlp);
49static void helperDispatch(helper_server * srv, Helper::Xaction * r);
51static void helperKickQueue(helper * hlp);
54static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
55
60
62
63void
65{
66 stats.uses=0;
67 stats.replies=0;
68 stats.pending=0;
69 stats.releases=0;
70 stats.timedout = 0;
71}
72
73void
75{
76#if _SQUID_WINDOWS_
77 shutdown(writePipe->fd, SD_BOTH);
78#endif
79
80 flags.closing = true;
81 if (readPipe->fd == writePipe->fd)
82 readPipe->fd = -1;
83 else
84 readPipe->close();
86
87#if _SQUID_WINDOWS_
88 if (hIpc) {
89 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
91 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
92 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
93 }
94 CloseHandle(hIpc);
95 }
96#else
97 (void)id_name;
98#endif
99}
100
101void
103{
104#if _SQUID_WINDOWS_
105 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
106#endif
107
108 flags.closing = true;
109 if (readPipe->fd == writePipe->fd)
110 readPipe->fd = -1;
111 writePipe->close();
112
113#if _SQUID_WINDOWS_
114 if (hIpc) {
115 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
117 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
118 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
119 }
120 CloseHandle(hIpc);
121 }
122#else
123 (void)id_name;
124#endif
125}
126
127void
129{
130 while (!requests.empty()) {
131 // XXX: re-schedule these on another helper?
132 Helper::Xaction *r = requests.front();
133 requests.pop_front();
134 void *cbdata;
138 }
139
140 delete r;
141 }
142}
143
145{
146 if (rbuf) {
148 rbuf = nullptr;
149 }
150}
151
153{
154 wqueue->clean();
155 delete wqueue;
156
157 if (writebuf) {
158 writebuf->clean();
159 delete writebuf;
160 writebuf = nullptr;
161 }
162
165
167
170
171 assert(requests.empty());
173}
174
175void
177{
179 requestsIndex.clear();
180}
181
183{
184 /* TODO: walk the local queue of requests and carry them all out */
187
189
191
194
195 assert(requests.empty());
196
198}
199
200void
202{
203 char *s;
204 char *progname;
205 char *shortname;
206 char *procname;
207 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
208 char fd_note_buf[FD_DESC_SZ];
209 helper_server *srv;
210 int nargs = 0;
211 int k;
212 pid_t pid;
213 int rfd;
214 int wfd;
215 void * hIpc;
216 wordlist *w;
217
218 if (hlp->cmdline == nullptr)
219 return;
220
221 progname = hlp->cmdline->key;
222
223 if ((s = strrchr(progname, '/')))
224 shortname = xstrdup(s + 1);
225 else
226 shortname = xstrdup(progname);
227
228 /* figure out how many new child are actually needed. */
229 int need_new = hlp->childs.needNew();
230
231 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
232
233 if (need_new < 1) {
234 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
235 }
236
237 procname = (char *)xmalloc(strlen(shortname) + 3);
238
239 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
240
241 args[nargs] = procname;
242 ++nargs;
243
244 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
245 args[nargs] = w->key;
246 ++nargs;
247 }
248
249 args[nargs] = nullptr;
250 ++nargs;
251
252 assert(nargs <= HELPER_MAX_ARGS);
253
254 for (k = 0; k < need_new; ++k) {
256 rfd = wfd = -1;
257 pid = ipcCreate(hlp->ipc_type,
258 progname,
259 args,
260 shortname,
261 hlp->addr,
262 &rfd,
263 &wfd,
264 &hIpc);
265
266 if (pid < 0) {
267 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
268 continue;
269 }
270
271 ++ hlp->childs.n_running;
272 ++ hlp->childs.n_active;
273 srv = new helper_server;
274 srv->hIpc = hIpc;
275 srv->pid = pid;
276 srv->initStats();
277 srv->addr = hlp->addr;
278 srv->readPipe = new Comm::Connection;
279 srv->readPipe->fd = rfd;
280 srv->writePipe = new Comm::Connection;
281 srv->writePipe->fd = wfd;
282 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
283 srv->wqueue = new MemBuf;
284 srv->roffset = 0;
285 srv->nextRequestId = 0;
286 srv->replyXaction = nullptr;
287 srv->ignoreToEom = false;
288 srv->parent = cbdataReference(hlp);
289 dlinkAddTail(srv, &srv->link, &hlp->servers);
290
291 if (rfd == wfd) {
292 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
293 fd_note(rfd, fd_note_buf);
294 } else {
295 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
296 fd_note(rfd, fd_note_buf);
297 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
298 fd_note(wfd, fd_note_buf);
299 }
300
302
303 if (wfd != rfd)
305
306 AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
307 comm_add_close_handler(rfd, closeCall);
308
309 if (hlp->timeout && hlp->childs.concurrency) {
310 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
312 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
313 }
314
315 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
317 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
318 }
319
321 safe_free(shortname);
322 safe_free(procname);
323 helperKickQueue(hlp);
324}
325
331void
333{
334 char *shortname;
335 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
336 char fd_note_buf[FD_DESC_SZ];
337 int nargs = 0;
338
339 if (hlp->cmdline == nullptr)
340 return;
341
342 if (hlp->childs.concurrency)
343 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
344
345 char *progname = hlp->cmdline->key;
346
347 char *s;
348 if ((s = strrchr(progname, '/')))
349 shortname = xstrdup(s + 1);
350 else
351 shortname = xstrdup(progname);
352
353 /* figure out haw mant new helpers are needed. */
354 int need_new = hlp->childs.needNew();
355
356 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
357
358 if (need_new < 1) {
359 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
360 }
361
362 char *procname = (char *)xmalloc(strlen(shortname) + 3);
363
364 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
365
366 args[nargs] = procname;
367 ++nargs;
368
369 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
370 args[nargs] = w->key;
371 ++nargs;
372 }
373
374 args[nargs] = nullptr;
375 ++nargs;
376
377 assert(nargs <= HELPER_MAX_ARGS);
378
379 for (int k = 0; k < need_new; ++k) {
381 int rfd = -1;
382 int wfd = -1;
383 void * hIpc;
384 pid_t pid = ipcCreate(hlp->ipc_type,
385 progname,
386 args,
387 shortname,
388 hlp->addr,
389 &rfd,
390 &wfd,
391 &hIpc);
392
393 if (pid < 0) {
394 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
395 continue;
396 }
397
398 ++ hlp->childs.n_running;
399 ++ hlp->childs.n_active;
401 srv->hIpc = hIpc;
402 srv->pid = pid;
403 srv->initStats();
404 srv->addr = hlp->addr;
405 srv->readPipe = new Comm::Connection;
406 srv->readPipe->fd = rfd;
407 srv->writePipe = new Comm::Connection;
408 srv->writePipe->fd = wfd;
409 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
410 srv->roffset = 0;
411 srv->parent = cbdataReference(hlp);
412 srv->reservationStart = 0;
413
414 dlinkAddTail(srv, &srv->link, &hlp->servers);
415
416 if (rfd == wfd) {
417 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
418 fd_note(rfd, fd_note_buf);
419 } else {
420 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
421 fd_note(rfd, fd_note_buf);
422 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
423 fd_note(wfd, fd_note_buf);
424 }
425
427
428 if (wfd != rfd)
430
431 AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
432 comm_add_close_handler(rfd, closeCall);
433
434 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
436 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
437 }
438
440 safe_free(shortname);
441 safe_free(procname);
443}
444
445void
447{
448 helper_server *srv;
449
450 if ((srv = GetFirstAvailable(this)))
451 helperDispatch(srv, r);
452 else
453 Enqueue(this, r);
454
456}
457
459static void
460SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
461{
462 auto result = Helper::Error;
463 if (!hlp) {
464 debugs(84, 3, "no helper");
465 result = Helper::Unknown;
466 }
467 // else pretend the helper has responded with ERR
468
469 callback(data, Helper::Reply(result));
470}
471
472void
473helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
474{
475 if (!hlp || !hlp->trySubmit(buf, callback, data))
476 SubmissionFailure(hlp, callback, data);
477}
478
480bool
482 return stats.queue_size >= static_cast<int>(childs.queue_size);
483}
484
485bool
487 return stats.queue_size > static_cast<int>(childs.queue_size);
488}
489
491void
493{
494 if (overloaded()) {
495 if (overloadStart) {
496 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
497 } else {
499 debugs(84, 3, id_name << " became overloaded");
500 }
501 } else {
502 if (overloadStart) {
503 debugs(84, 5, id_name << " is no longer overloaded");
504 if (droppedRequests) {
505 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
506 " is no longer overloaded after dropping " << droppedRequests <<
507 " requests in " << (squid_curtime - overloadStart) << " seconds");
508 droppedRequests = 0;
509 }
510 overloadStart = 0;
511 }
512 }
513}
514
518bool
520{
521 // re-sync for the configuration may have changed since the last submission
523
524 // Nothing special to do if the new request does not overload (i.e., the
525 // queue is not even full yet) or only _starts_ overloading this helper
526 // (i.e., the queue is currently at its limit).
527 if (!overloaded())
528 return true;
529
530 if (squid_curtime - overloadStart <= 180)
531 return true; // also OK: overload has not persisted long enough to panic
532
534 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
535
536 if (!droppedRequests) {
537 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
538 id_name << " helper configured with on-persistent-overload=err");
539 }
541 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
542 return false;
543}
544
545bool
546helper::trySubmit(const char *buf, HLPCB * callback, void *data)
547{
548 if (!prepSubmit())
549 return false; // request was dropped
550
551 submit(buf, callback, data); // will send or queue
552 return true; // request submitted or queued
553}
554
556void
557helper::submit(const char *buf, HLPCB * callback, void *data)
558{
559 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
560 submitRequest(r);
561 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
562}
563
566void
567helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
568{
569 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
570 SubmissionFailure(hlp, callback, data);
571}
572
574bool
575statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
576{
577 if (!prepSubmit())
578 return false; // request was dropped
579
580 submit(buf, callback, data, reservation); // will send or queue
581 return true; // request submitted or queued
582}
583
584void
586{
587 // clear any old reservation
588 if (srv->reserved()) {
589 reservations.erase(srv->reservationId);
590 srv->clearReservation();
591 }
592
593 srv->reserve();
594 reservations.insert(Reservations::value_type(srv->reservationId, srv));
595}
596
597void
599{
600 const auto it = reservations.find(reservation);
601 if (it == reservations.end())
602 return;
603
604 helper_stateful_server *srv = it->second;
605 reservations.erase(it);
606 srv->clearReservation();
607
608 // schedule a queue kick
609 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
610 ScheduleCallHere(call);
611}
612
615{
616 const auto it = reservations.find(reservation);
617 if (it == reservations.end())
618 return nullptr;
619 return it->second;
620}
621
622void
624{
628 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
629}
630
631void
633{
634 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
635 if (!reservationId)
636 return;
637
638 ++stats.releases;
639
642}
643
644void
645statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
646{
647 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
648
649 if (buf && reservation) {
650 debugs(84, 5, reservation);
651 helper_stateful_server *lastServer = findServer(reservation);
652 if (!lastServer) {
653 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
655 r->request.callback(r->request.data, r->reply);
656 delete r;
657 return;
658 }
659 debugs(84, 5, "StatefulSubmit dispatching");
660 helperStatefulDispatch(lastServer, r);
661 } else {
663 if ((srv = StatefulGetFirstAvailable(this))) {
664 reserveServer(srv);
666 } else
667 StatefulEnqueue(this, r);
668 }
669
670 debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
671 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
672
674}
675
676void
677helper::packStatsInto(Packable *p, const char *label) const
678{
679 if (label)
680 p->appendf("%s:\n", label);
681
682 p->appendf(" program: %s\n", cmdline->key);
683 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
684 p->appendf(" requests sent: %d\n", stats.requests);
685 p->appendf(" replies received: %d\n", stats.replies);
686 p->appendf(" requests timedout: %d\n", stats.timedout);
687 p->appendf(" queue length: %d\n", stats.queue_size);
688 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
689 p->append("\n",1);
690 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
691 "ID #",
692 "FD",
693 "PID",
694 "# Requests",
695 "# Replies",
696 "# Timed-out",
697 "Flags",
698 "Time",
699 "Offset",
700 "Request");
701
702 for (dlink_node *link = servers.head; link; link = link->next) {
703 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
704 assert(srv);
705 Helper::Xaction *xaction = srv->requests.empty() ? nullptr : srv->requests.front();
706 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
707 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
708 srv->index.value,
709 srv->readPipe->fd,
710 srv->pid,
711 srv->stats.uses,
712 srv->stats.replies,
713 srv->stats.timedout,
714 srv->stats.pending ? 'B' : ' ',
715 srv->flags.writing ? 'W' : ' ',
716 srv->flags.closing ? 'C' : ' ',
717 srv->reserved() ? 'R' : ' ',
718 srv->flags.shutdown ? 'S' : ' ',
719 xaction && xaction->request.placeholder ? 'P' : ' ',
720 tt < 0.0 ? 0.0 : tt,
721 (int) srv->roffset,
722 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
723 }
724
725 p->append("\nFlags key:\n"
726 " B\tBUSY\n"
727 " W\tWRITING\n"
728 " C\tCLOSING\n"
729 " R\tRESERVED\n"
730 " S\tSHUTDOWN PENDING\n"
731 " P\tPLACEHOLDER\n", 101);
732}
733
734bool
736 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
737}
738
739void
741{
742 dlink_node *link = hlp->servers.head;
743
744 while (link) {
745 helper_server *srv;
746 srv = (helper_server *)link->data;
747 link = link->next;
748
749 if (srv->flags.shutdown) {
750 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
751 continue;
752 }
753
754 assert(hlp->childs.n_active > 0);
755 -- hlp->childs.n_active;
756 srv->flags.shutdown = true; /* request it to shut itself down */
757
758 if (srv->flags.closing) {
759 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
760 continue;
761 }
762
763 if (srv->stats.pending) {
764 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
765 continue;
766 }
767
768 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
769 /* the rest of the details is dealt with in the helperServerFree
770 * close handler
771 */
772 srv->closePipesSafely(hlp->id_name);
773 }
774}
775
776void
778{
779 dlink_node *link = hlp->servers.head;
781
782 while (link) {
783 srv = (helper_stateful_server *)link->data;
784 link = link->next;
785
786 if (srv->flags.shutdown) {
787 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
788 continue;
789 }
790
791 assert(hlp->childs.n_active > 0);
792 -- hlp->childs.n_active;
793 srv->flags.shutdown = true; /* request it to shut itself down */
794
795 if (srv->stats.pending) {
796 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
797 continue;
798 }
799
800 if (srv->flags.closing) {
801 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
802 continue;
803 }
804
805 if (srv->reserved()) {
806 if (shutting_down) {
807 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
808 } else {
809 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
810 continue;
811 }
812 }
813
814 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
815
816 /* the rest of the details is dealt with in the helperStatefulServerFree
817 * close handler
818 */
819 srv->closePipesSafely(hlp->id_name);
820 }
821}
822
824{
825 /* note, don't free id_name, it probably points to static memory */
826
827 // TODO: if the queue is not empty it will leak Helper::Request's
828 if (!queue.empty())
829 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
830}
831
832void
834{
835 needsNewServers = false;
836 if (!srv->flags.shutdown) {
839 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
840
841 if (childs.needNew() > 0) {
842 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
843
845 if (srv->stats.replies < 1)
846 fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
847 else
848 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
849 }
850 srv->flags.shutdown = true;
851 needsNewServers = true;
852 }
853 }
854}
855
856void
858{
859 helper *hlp = srv->getParent();
860
861 bool needsNewServers = false;
862 hlp->handleKilledServer(srv, needsNewServers);
863 if (needsNewServers) {
864 debugs(80, DBG_IMPORTANT, "Starting new helpers");
866 }
867
868 srv->dropQueued();
869
870 delete srv;
871}
872
873// XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
874// TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
875void
877{
878 statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
879
880 bool needsNewServers = false;
881 hlp->handleKilledServer(srv, needsNewServers);
882 if (needsNewServers) {
883 debugs(80, DBG_IMPORTANT, "Starting new helpers");
885 }
886
887 srv->dropQueued();
888
889 delete srv;
890}
891
893helper_server::popRequest(int request_number)
894{
895 Helper::Xaction *r = nullptr;
896 helper_server::RequestIndex::iterator it;
898 // If concurrency supported retrieve request from ID
899 it = requestsIndex.find(request_number);
900 if (it != requestsIndex.end()) {
901 r = *(it->second);
902 requests.erase(it->second);
903 requestsIndex.erase(it);
904 }
905 } else if(!requests.empty()) {
906 // Else get the first request from queue, if any
907 r = requests.front();
908 requests.pop_front();
909 }
910
911 return r;
912}
913
915static void
916helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
917{
918 if (Helper::Xaction *r = srv->replyXaction) {
919 const bool hasSpace = r->reply.accumulate(msg, msgSize);
920 if (!hasSpace) {
921 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
922 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
923 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
924 srv->closePipesSafely(hlp->id_name);
925 return;
926 }
927
928 if (!msgEnd)
929 return; // We are waiting for more data.
930
931 bool retry = false;
933 r->reply.finalize();
935 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
936 retry = true;
937 } else {
938 HLPCB *callback = r->request.callback;
939 r->request.callback = nullptr;
940 void *cbdata = nullptr;
942 callback(cbdata, r->reply);
943 }
944 }
945
946 -- srv->stats.pending;
947 ++ srv->stats.replies;
948
949 ++ hlp->stats.replies;
950
952
954
955 hlp->stats.avg_svc_time =
959
960 // release or re-submit parsedRequestXaction object
961 srv->replyXaction = nullptr;
962 if (retry) {
963 ++r->request.retries;
964 hlp->submitRequest(r);
965 } else
966 delete r;
967 }
968
969 if (hlp->timeout && hlp->childs.concurrency)
971
972 if (!srv->flags.shutdown) {
973 helperKickQueue(hlp);
974 } else if (!srv->flags.closing && !srv->stats.pending) {
976 }
977}
978
979static void
980helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
981{
982 helper_server *srv = (helper_server *)data;
983 helper *hlp = srv->parent;
985
986 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
987
988 if (flag == Comm::ERR_CLOSING) {
989 return;
990 }
991
992 assert(conn->fd == srv->readPipe->fd);
993
994 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
995
996 if (flag != Comm::OK || len == 0) {
997 srv->closePipesSafely(hlp->id_name);
998 return;
999 }
1000
1001 srv->roffset += len;
1002 srv->rbuf[srv->roffset] = '\0';
1003 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1004
1005 if (!srv->stats.pending && !srv->stats.timedout) {
1006 /* someone spoke without being spoken to */
1007 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected read from " <<
1008 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1009 " bytes '" << srv->rbuf << "'");
1010
1011 srv->roffset = 0;
1012 srv->rbuf[0] = '\0';
1013 }
1014
1015 bool needsMore = false;
1016 char *msg = srv->rbuf;
1017 while (*msg && !needsMore) {
1018 int skip = 0;
1019 char *eom = strchr(msg, hlp->eom);
1020 if (eom) {
1021 skip = 1;
1022 debugs(84, 3, "helperHandleRead: end of reply found");
1023 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1024 *eom = '\0';
1025 // rewind to the \r octet which is the real terminal now
1026 // and remember that we have to skip forward 2 places now.
1027 skip = 2;
1028 --eom;
1029 }
1030 *eom = '\0';
1031 }
1032
1033 if (!srv->ignoreToEom && !srv->replyXaction) {
1034 int i = 0;
1035 if (hlp->childs.concurrency) {
1036 char *e = nullptr;
1037 i = strtol(msg, &e, 10);
1038 // Do we need to check for e == msg? Means wrong response from helper.
1039 // Will be dropped as "unexpected reply on channel 0"
1040 needsMore = !(xisspace(*e) || (eom && e == eom));
1041 if (!needsMore) {
1042 msg = e;
1043 while (*msg && xisspace(*msg))
1044 ++msg;
1045 } // else not enough data to compute request number
1046 }
1047 if (!(srv->replyXaction = srv->popRequest(i))) {
1048 if (srv->stats.timedout) {
1049 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1050 } else {
1051 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1052 i << " from " << hlp->id_name << " #" << srv->index <<
1053 " '" << srv->rbuf << "'");
1054 }
1055 srv->ignoreToEom = true;
1056 }
1057 } // else we need to just append reply data to the current Xaction
1058
1059 if (!needsMore) {
1060 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1061 assert(msgSize <= srv->rbuf_sz);
1062 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1063 msg += msgSize + skip;
1064 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1065
1066 // The next message should not ignored.
1067 if (eom && srv->ignoreToEom)
1068 srv->ignoreToEom = false;
1069 } else
1070 assert(skip == 0 && eom == nullptr);
1071 }
1072
1073 if (needsMore) {
1074 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1075 assert(msgSize <= srv->rbuf_sz);
1076 memmove(srv->rbuf, msg, msgSize);
1077 srv->roffset = msgSize;
1078 srv->rbuf[srv->roffset] = '\0';
1079 } else {
1080 // All of the responses parsed and msg points at the end of read data
1081 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1082 srv->roffset = 0;
1083 }
1084
1085 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1086 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1087 assert(spaceSize >= 0);
1088
1089 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1091 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1092 }
1093}
1094
1095static void
1096helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1097{
1098 char *t = nullptr;
1100 statefulhelper *hlp = srv->parent;
1102
1103 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1104
1105 if (flag == Comm::ERR_CLOSING) {
1106 return;
1107 }
1108
1109 assert(conn->fd == srv->readPipe->fd);
1110
1111 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1112 hlp->id_name << " #" << srv->index);
1113
1114 if (flag != Comm::OK || len == 0) {
1115 srv->closePipesSafely(hlp->id_name);
1116 return;
1117 }
1118
1119 srv->roffset += len;
1120 srv->rbuf[srv->roffset] = '\0';
1121 Helper::Xaction *r = srv->requests.front();
1122 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1123
1124 if (r == nullptr) {
1125 /* someone spoke without being spoken to */
1126 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulHandleRead: unexpected read from " <<
1127 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1128 " bytes '" << srv->rbuf << "'");
1129
1130 srv->roffset = 0;
1131 }
1132
1133 if ((t = strchr(srv->rbuf, hlp->eom))) {
1134 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1135
1136 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1137 *t = '\0';
1138 // rewind to the \r octet which is the real terminal now
1139 --t;
1140 }
1141
1142 *t = '\0';
1143 }
1144
1145 if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1146 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1147 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1148 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1149 srv->closePipesSafely(hlp->id_name);
1150 return;
1151 }
1158 srv->roffset = 0;
1159
1160 if (t) {
1161 /* end of reply found */
1162 srv->requests.pop_front(); // we already have it in 'r'
1163 int called = 1;
1164
1165 if (r && cbdataReferenceValid(r->request.data)) {
1166 r->reply.finalize();
1168 r->request.callback(r->request.data, r->reply);
1169 } else {
1170 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1171 called = 0;
1172 }
1173
1174 delete r;
1175
1176 -- srv->stats.pending;
1177 ++ srv->stats.replies;
1178
1179 ++ hlp->stats.replies;
1181 hlp->stats.avg_svc_time =
1185
1186 if (called)
1188 else
1190 }
1191
1192 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1193 int spaceSize = srv->rbuf_sz - 1;
1194
1195 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1197 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1198 }
1199}
1200
1202static void
1204{
1205 hlp->queue.push(r);
1206 ++ hlp->stats.queue_size;
1207
1208 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1209 if (hlp->childs.needNew() > 0) {
1210 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1211 helperOpenServers(hlp);
1212 return;
1213 }
1214
1215 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1216 return;
1217
1218 if (squid_curtime - hlp->last_queue_warn < 600)
1219 return;
1220
1222 return;
1223
1225
1226 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1227 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1228 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1229}
1230
1231static void
1233{
1234 hlp->queue.push(r);
1235 ++ hlp->stats.queue_size;
1236
1237 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1238 if (hlp->childs.needNew() > 0) {
1239 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1241 return;
1242 }
1243
1244 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1245 return;
1246
1247 if (squid_curtime - hlp->last_queue_warn < 600)
1248 return;
1249
1251 return;
1252
1254
1255 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1256 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1257 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1258}
1259
1262{
1263 if (queue.empty())
1264 return nullptr;
1265
1266 auto *r = queue.front();
1267 queue.pop();
1268 --stats.queue_size;
1269 return r;
1270}
1271
1272static helper_server *
1274{
1275 dlink_node *n;
1276 helper_server *srv;
1277 helper_server *selected = nullptr;
1278 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1279
1280 if (hlp->childs.n_running == 0)
1281 return nullptr;
1282
1283 /* Find "least" loaded helper (approx) */
1284 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1285 srv = (helper_server *)n->data;
1286
1287 if (selected && selected->stats.pending <= srv->stats.pending)
1288 continue;
1289
1290 if (srv->flags.shutdown)
1291 continue;
1292
1293 if (!srv->stats.pending)
1294 return srv;
1295
1296 if (selected) {
1297 selected = srv;
1298 break;
1299 }
1300
1301 selected = srv;
1302 }
1303
1304 if (!selected) {
1305 debugs(84, 5, "GetFirstAvailable: None available.");
1306 return nullptr;
1307 }
1308
1309 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1310 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1311 return nullptr;
1312 }
1313
1314 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1315 return selected;
1316}
1317
1320{
1321 dlink_node *n;
1322 helper_stateful_server *srv = nullptr;
1323 helper_stateful_server *oldestReservedServer = nullptr;
1324 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1325
1326 if (hlp->childs.n_running == 0)
1327 return nullptr;
1328
1329 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1330 srv = (helper_stateful_server *)n->data;
1331
1332 if (srv->stats.pending)
1333 continue;
1334
1335 if (srv->reserved()) {
1337 if (!oldestReservedServer)
1338 oldestReservedServer = srv;
1339 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1340 oldestReservedServer = srv;
1341 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1342 }
1343 continue;
1344 }
1345
1346 if (srv->flags.shutdown)
1347 continue;
1348
1349 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1350 return srv;
1351 }
1352
1353 if (oldestReservedServer) {
1354 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1355 return oldestReservedServer;
1356 }
1357
1358 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1359 return nullptr;
1360}
1361
1362static void
1363helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1364{
1365 helper_server *srv = (helper_server *)data;
1366
1367 srv->writebuf->clean();
1368 delete srv->writebuf;
1369 srv->writebuf = nullptr;
1370 srv->flags.writing = false;
1371
1372 if (flag != Comm::OK) {
1373 /* Helper server has crashed */
1374 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1375 return;
1376 }
1377
1378 if (!srv->wqueue->isNull()) {
1379 srv->writebuf = srv->wqueue;
1380 srv->wqueue = new MemBuf;
1381 srv->flags.writing = true;
1382 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1384 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1385 }
1386}
1387
1388static void
1390{
1391 helper *hlp = srv->parent;
1392 const uint64_t reqId = ++srv->nextRequestId;
1393
1395 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1396 delete r;
1397 return;
1398 }
1399
1400 r->request.Id = reqId;
1401 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1403
1404 if (srv->wqueue->isNull())
1405 srv->wqueue->init();
1406
1407 if (hlp->childs.concurrency) {
1408 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1409 assert(srv->requestsIndex.size() == srv->requests.size());
1410 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1411 } else
1412 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1413
1414 if (!srv->flags.writing) {
1415 assert(nullptr == srv->writebuf);
1416 srv->writebuf = srv->wqueue;
1417 srv->wqueue = new MemBuf;
1418 srv->flags.writing = true;
1419 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1421 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1422 }
1423
1424 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1425
1426 ++ srv->stats.uses;
1427 ++ srv->stats.pending;
1428 ++ hlp->stats.requests;
1429}
1430
1431static void
1433{}
1434
1435static void
1437{
1438 statefulhelper *hlp = srv->parent;
1439
1441 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1442 delete r;
1444 return;
1445 }
1446
1447 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1448
1449 assert(srv->reservationId);
1451
1452 if (r->request.placeholder == 1) {
1453 /* a callback is needed before this request can _use_ a helper. */
1454 /* we don't care about releasing this helper. The request NEVER
1455 * gets to the helper. So we throw away the return code */
1457 r->request.callback(r->request.data, r->reply);
1458 /* throw away the placeholder */
1459 delete r;
1460 /* and push the queue. Note that the callback may have submitted a new
1461 * request to the helper which is why we test for the request */
1462
1463 if (!srv->requests.size())
1465
1466 return;
1467 }
1468
1469 srv->requests.push_back(r);
1471 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1473 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
1474 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1475 hlp->id_name << " #" << srv->index << ", " <<
1476 (int) strlen(r->request.buf) << " bytes");
1477
1478 ++ srv->stats.uses;
1479 ++ srv->stats.pending;
1480 ++ hlp->stats.requests;
1481}
1482
1483static void
1485{
1486 Helper::Xaction *r;
1487 helper_server *srv;
1488
1489 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1490 helperDispatch(srv, r);
1491}
1492
1493static void
1495{
1496 Helper::Xaction *r;
1498 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1499 debugs(84, 5, "found srv-" << srv->index);
1500 hlp->reserveServer(srv);
1501 helperStatefulDispatch(srv, r);
1502 }
1503}
1504
1505static void
1507{
1508 if (!srv->flags.shutdown) {
1510 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1512 return;
1513 }
1514}
1515
1516void
1518{
1520 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1521 Helper::Xaction *r = requests.front();
1522 RequestIndex::iterator it;
1523 it = requestsIndex.find(r->request.Id);
1524 assert(it != requestsIndex.end());
1525 requestsIndex.erase(it);
1526 requests.pop_front();
1527 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1528 void *cbdata;
1529 bool retried = false;
1530 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1531 debugs(84, 2, "Retry request " << r->request.Id);
1532 ++r->request.retries;
1534 retried = true;
1535 } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1538 r->reply.finalize();
1539 else
1541 r->request.callback(cbdata, r->reply);
1542 } else {
1544 r->request.callback(cbdata, r->reply);
1545 }
1546 }
1547 --stats.pending;
1548 ++stats.timedout;
1550 if (!retried)
1551 delete r;
1552 }
1553}
1554
1555void
1557{
1558 debugs(26, 3, io.conn);
1559 helper_server *srv = static_cast<helper_server *>(io.data);
1560
1562
1563 debugs(84, 3, io.conn << " establish new helper_server::requestTimeout");
1564 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1566
1567 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1568 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1569
1570 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1571}
1572
#define ScheduleCallHere(call)
Definition: AsyncCall.h:164
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:154
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:342
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:36
static void * hIpc
Definition: IcmpSquid.cc:33
static pid_t pid
Definition: IcmpSquid.cc:34
time_t squid_curtime
Definition: stub_libtime.cc:20
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:59
int conn
the current server connection FD
Definition: Transport.cc:26
#define assert(EX)
Definition: assert.h:19
char progname[]
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:398
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
#define cbdataReference(var)
Definition: cbdata.h:341
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:318
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:256
Comm::ConnectionPointer conn
Definition: CommCalls.h:85
represents a single helper process abstraction
Definition: helper.h:173
struct HelperServerBase::_helper_flags flags
Ip::Address addr
Definition: helper.h:206
Comm::ConnectionPointer writePipe
Definition: helper.h:208
const InstanceId< HelperServerBase > index
Definition: helper.h:204
dlink_node link
Definition: helper.h:218
size_t rbuf_sz
Definition: helper.h:212
uint64_t uses
Definition: helper.h:230
void closeWritePipeSafely(const char *name)
Definition: helper.cc:102
size_t roffset
Definition: helper.h:213
uint64_t pending
Definition: helper.h:232
virtual bool reserved()=0
whether the server is locked for exclusive use by a client
void closePipesSafely(const char *name)
Definition: helper.cc:74
void initStats()
Definition: helper.cc:64
virtual ~HelperServerBase()
Definition: helper.cc:144
struct timeval dispatch_time
Definition: helper.h:215
void * hIpc
Definition: helper.h:209
char * rbuf
Definition: helper.h:211
Comm::ConnectionPointer readPipe
Definition: helper.h:207
struct timeval answer_time
Definition: helper.h:216
virtual void dropQueued()
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:128
uint64_t replies
Definition: helper.h:231
uint64_t timedout
Definition: helper.h:234
Requests requests
requests in order of submission/expiration
Definition: helper.h:227
struct HelperServerBase::@74 stats
unsigned int n_max
Definition: ChildConfig.h:48
unsigned int n_running
Definition: ChildConfig.h:80
unsigned int n_startup
Definition: ChildConfig.h:57
unsigned int n_active
Definition: ChildConfig.h:86
unsigned int queue_size
Definition: ChildConfig.h:91
time_t reservationTimeout
older stateful helper server reservations may be forgotten
Definition: ChildConfig.h:109
unsigned int concurrency
Definition: ChildConfig.h:72
SubmissionErrorHandlingAction onPersistentOverload
how to handle a new request for helper that was overloaded for too long
Definition: ChildConfig.h:99
@ actDie
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
int needNew() const
Definition: ChildConfig.cc:59
Helper::ResultCode result
The helper response 'result' field.
Definition: Reply.h:59
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
Definition: Reply.h:65
void finalize()
Definition: Reply.cc:38
bool accumulate(const char *buf, size_t len)
Definition: Reply.cc:25
HLPCB * callback
Definition: Request.h:40
char * buf
Definition: Request.h:39
void * data
Definition: Request.h:41
uint64_t Id
Definition: Request.h:45
struct timeval dispatch_time
Definition: Request.h:44
int placeholder
Definition: Request.h:43
a (temporary) lock on a (stateful) helper channel
Definition: ReservationId.h:18
static ReservationId Next()
Holds the required data to serve a helper request.
Definition: helper.h:38
Helper::Reply reply
Definition: helper.h:43
Helper::Request request
Definition: helper.h:42
Value value
instance identifier
Definition: InstanceId.h:69
Definition: MemBuf.h:24
void clean()
Definition: MemBuf.cc:110
virtual void append(const char *c, int sz)
Definition: MemBuf.cc:209
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:93
char * content()
start of the added data
Definition: MemBuf.h:41
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
int isNull() const
Definition: MemBuf.cc:145
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
Definition: Raw.h:21
const char * rawContent() const
Definition: SBuf.cc:509
size_type length() const
Returns the number of bytes stored in SBuf.
Definition: SBuf.h:415
bool isEmpty() const
Definition: SBuf.h:431
Definition: cbdata.cc:60
represents a single "stateless helper" process
Definition: helper.h:245
uint64_t nextRequestId
Definition: helper.h:249
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1556
virtual ~helper_server()
Definition: helper.cc:152
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1517
helper * parent
Definition: helper.h:254
bool ignoreToEom
Whether to ignore current message, because it is timed-out or other reason.
Definition: helper.h:263
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:267
MemBuf * wqueue
Definition: helper.h:251
static void HelperServerClosed(helper_server *srv)
close handler to handle exited server processes
Definition: helper.cc:857
Helper::Xaction * popRequest(int requestId)
Definition: helper.cc:893
Helper::Xaction * replyXaction
Definition: helper.h:260
MemBuf * writebuf
Definition: helper.h:252
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:283
virtual void dropQueued() override
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:176
represents a single "stateful helper" process
Definition: helper.h:295
time_t reservationStart
when the last reservation was made
Definition: helper.h:317
statefulhelper * parent
Definition: helper.h:310
static void HelperServerClosed(helper_stateful_server *srv)
close handler to handle exited server processes
Definition: helper.cc:876
virtual ~helper_stateful_server()
Definition: helper.cc:182
Helper::ReservationId reservationId
"confirmation ID" of the last
Definition: helper.h:316
virtual bool reserved() override
whether the server is locked for exclusive use by a client
Definition: helper.h:304
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:305
Definition: helper.h:64
void packStatsInto(Packable *p, const char *label=nullptr) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:677
time_t timeout
Requests timeout.
Definition: helper.h:118
char eom
The char which marks the end of (response) message, normally ' '.
Definition: helper.h:122
Ip::Address addr
Definition: helper.h:113
void submitRequest(Helper::Xaction *r)
Definition: helper.cc:446
wordlist * cmdline
Definition: helper.h:107
unsigned int droppedRequests
requests not sent during helper overload
Definition: helper.h:114
~helper()
Definition: helper.cc:823
bool prepSubmit()
Definition: helper.cc:519
bool willOverload() const
Definition: helper.cc:735
dlink_list servers
Definition: helper.h:108
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:557
const char * id_name
Definition: helper.h:110
struct helper::_stats stats
time_t last_queue_warn
Definition: helper.h:116
Helper::ChildConfig childs
Configuration settings for number running.
Definition: helper.h:111
SBuf onTimedOutResponse
The response to use when helper response timedout.
Definition: helper.h:121
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:481
bool overloaded() const
Definition: helper.cc:486
int ipc_type
Definition: helper.h:112
void handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
Definition: helper.cc:833
time_t overloadStart
when the helper became overloaded (zero if it is not)
Definition: helper.h:115
std::queue< Helper::Xaction * > queue
Definition: helper.h:109
bool retryTimedOut
Whether the timed-out requests must retried.
Definition: helper.h:119
Helper::Xaction * nextRequest()
Definition: helper.cc:1261
time_t last_restart
Definition: helper.h:117
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:492
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:546
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:645
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
Definition: helper.cc:614
void reserveServer(helper_stateful_server *srv)
reserve the given server
Definition: helper.cc:585
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
Definition: helper.cc:598
Reservations reservations
Definition: helper.h:168
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
reserved servers indexed by reservation IDs
Definition: helper.cc:575
char * key
Definition: wordlist.h:32
wordlist * next
Definition: wordlist.h:33
int commSetNonBlocking(int fd)
Definition: comm.cc:1064
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:947
int commSetConnTimeout(const Comm::ConnectionPointer &conn, int timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:589
A const & max(A const &lhs, A const &rhs)
#define Important(id)
Definition: Messages.h:91
#define DBG_DATA
Definition: Stream.h:43
#define DBG_IMPORTANT
Definition: Stream.h:41
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:196
#define DBG_CRITICAL
Definition: Stream.h:40
#define REDIRECT_AV_FACTOR
Definition: defines.h:53
#define FD_DESC_SZ
Definition: defines.h:32
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
void fd_note(int fd, const char *s)
Definition: fd.cc:217
#define fd_table
Definition: fde.h:189
int shutting_down
int reconfiguring
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:27
static void helperReturnBuffer(helper_server *srv, helper *hlp, char *msg, size_t msgSize, char *msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:916
static helper_server * GetFirstAvailable(const helper *hlp)
Definition: helper.cc:1273
const size_t ReadBufSize(32 *1024)
Helpers input buffer size.
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1432
static void SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:460
void helperShutdown(helper *hlp)
Definition: helper.cc:740
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1506
static void helperKickQueue(helper *hlp)
Definition: helper.cc:1484
#define HELPER_MAX_ARGS
Definition: helper.cc:36
void helperStatefulOpenServers(statefulhelper *hlp)
Definition: helper.cc:332
void helperStatefulShutdown(statefulhelper *hlp)
Definition: helper.cc:777
void helperStatefulSubmit(statefulhelper *hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:567
static void Enqueue(helper *hlp, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1203
static void helperStatefulKickQueue(statefulhelper *hlp)
Definition: helper.cc:1494
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1232
static void helperDispatch(helper_server *srv, Helper::Xaction *r)
Definition: helper.cc:1389
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1436
static IOCB helperStatefulHandleRead
Definition: helper.cc:45
static helper_stateful_server * StatefulGetFirstAvailable(statefulhelper *hlp)
Definition: helper.cc:1319
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1363
void helperOpenServers(helper *hlp)
Definition: helper.cc:201
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:39
static IOCB helperHandleRead
Definition: helper.cc:44
void helperSubmit(helper *hlp, const char *buf, HLPCB *callback, void *data)
Definition: helper.cc:473
InstanceIdDefinitions(HelperServerBase, "Hlpr")
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:62
void memFreeBuf(size_t size, void *)
Definition: minimal.cc:84
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
Flag
Definition: Flag.h:15
@ OK
Definition: Flag.h:16
@ ERR_CLOSING
Definition: Flag.h:25
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
@ Unknown
Definition: ResultCode.h:17
@ BrokenHelper
Definition: ResultCode.h:20
@ Error
Definition: ResultCode.h:19
@ TimedOut
Definition: ResultCode.h:21
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
SSL Connection
Definition: Session.h:45
#define xstrdup
#define xmalloc
int timedout
Definition: helper.h:127
int requests
Definition: helper.h:125
int queue_size
Definition: helper.h:128
int avg_svc_time
Definition: helper.h:129
int unsigned int
Definition: stub_fd.cc:19
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:17
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
#define PRIu64
Definition: types.h:120
#define safe_free(x)
Definition: xalloc.h:73
#define xisspace(x)
Definition: xis.h:17

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors