helper.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 84 Helper process maintenance */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "base/Raw.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
20 #include "fd.h"
21 #include "fde.h"
22 #include "format/Quoting.h"
23 #include "helper.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
26 #include "MemBuf.h"
27 #include "SquidConfig.h"
28 #include "SquidIpc.h"
29 #include "SquidMath.h"
30 #include "Store.h"
31 #include "wordlist.h"
32 
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
34 #include "mem/Pool.h"
35 
36 #define HELPER_MAX_ARGS 64
37 
39 #define MAX_RETRIES 2
40 
42 const size_t ReadBufSize(32*1024);
43 
46 static void Enqueue(helper * hlp, Helper::Xaction *);
47 static helper_server *GetFirstAvailable(const helper * hlp);
49 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
51 static void helperKickQueue(helper * hlp);
52 static void helperStatefulKickQueue(statefulhelper * hlp);
54 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
55 
60 
62 
63 void
65 {
66  stats.uses=0;
67  stats.replies=0;
68  stats.pending=0;
69  stats.releases=0;
70  stats.timedout = 0;
71 }
72 
73 void
75 {
76 #if _SQUID_WINDOWS_
77  shutdown(writePipe->fd, SD_BOTH);
78 #endif
79 
80  flags.closing = true;
81  if (readPipe->fd == writePipe->fd)
82  readPipe->fd = -1;
83  else
84  readPipe->close();
85  writePipe->close();
86 
87 #if _SQUID_WINDOWS_
88  if (hIpc) {
89  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
91  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
92  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
93  }
94  CloseHandle(hIpc);
95  }
96 #else
97  (void)id_name;
98 #endif
99 }
100 
101 void
103 {
104 #if _SQUID_WINDOWS_
105  shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
106 #endif
107 
108  flags.closing = true;
109  if (readPipe->fd == writePipe->fd)
110  readPipe->fd = -1;
111  writePipe->close();
112 
113 #if _SQUID_WINDOWS_
114  if (hIpc) {
115  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
116  getCurrentTime();
117  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
118  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
119  }
120  CloseHandle(hIpc);
121  }
122 #else
123  (void)id_name;
124 #endif
125 }
126 
127 void
129 {
130  while (!requests.empty()) {
131  // XXX: re-schedule these on another helper?
132  Helper::Xaction *r = requests.front();
133  requests.pop_front();
134  void *cbdata;
137  r->request.callback(cbdata, r->reply);
138  }
139 
140  delete r;
141  }
142 }
143 
145 {
146  if (rbuf) {
148  rbuf = NULL;
149  }
150 }
151 
153 {
154  wqueue->clean();
155  delete wqueue;
156 
157  if (writebuf) {
158  writebuf->clean();
159  delete writebuf;
160  writebuf = NULL;
161  }
162 
165 
167 
169  -- parent->childs.n_running;
170 
171  assert(requests.empty());
173 }
174 
175 void
177 {
179  requestsIndex.clear();
180 }
181 
183 {
184  /* TODO: walk the local queue of requests and carry them all out */
187 
189 
191 
193  -- parent->childs.n_running;
194 
195  assert(requests.empty());
196 
198 }
199 
200 void
202 {
203  char *s;
204  char *progname;
205  char *shortname;
206  char *procname;
207  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
208  char fd_note_buf[FD_DESC_SZ];
209  helper_server *srv;
210  int nargs = 0;
211  int k;
212  pid_t pid;
213  int rfd;
214  int wfd;
215  void * hIpc;
216  wordlist *w;
217 
218  if (hlp->cmdline == NULL)
219  return;
220 
221  progname = hlp->cmdline->key;
222 
223  if ((s = strrchr(progname, '/')))
224  shortname = xstrdup(s + 1);
225  else
226  shortname = xstrdup(progname);
227 
228  /* figure out how many new child are actually needed. */
229  int need_new = hlp->childs.needNew();
230 
231  debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
232 
233  if (need_new < 1) {
234  debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
235  }
236 
237  procname = (char *)xmalloc(strlen(shortname) + 3);
238 
239  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
240 
241  args[nargs] = procname;
242  ++nargs;
243 
244  for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
245  args[nargs] = w->key;
246  ++nargs;
247  }
248 
249  args[nargs] = NULL;
250  ++nargs;
251 
252  assert(nargs <= HELPER_MAX_ARGS);
253 
254  for (k = 0; k < need_new; ++k) {
255  getCurrentTime();
256  rfd = wfd = -1;
257  pid = ipcCreate(hlp->ipc_type,
258  progname,
259  args,
260  shortname,
261  hlp->addr,
262  &rfd,
263  &wfd,
264  &hIpc);
265 
266  if (pid < 0) {
267  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
268  continue;
269  }
270 
271  ++ hlp->childs.n_running;
272  ++ hlp->childs.n_active;
273  srv = new helper_server;
274  srv->hIpc = hIpc;
275  srv->pid = pid;
276  srv->initStats();
277  srv->addr = hlp->addr;
278  srv->readPipe = new Comm::Connection;
279  srv->readPipe->fd = rfd;
280  srv->writePipe = new Comm::Connection;
281  srv->writePipe->fd = wfd;
282  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
283  srv->wqueue = new MemBuf;
284  srv->roffset = 0;
285  srv->nextRequestId = 0;
286  srv->replyXaction = NULL;
287  srv->ignoreToEom = false;
288  srv->parent = cbdataReference(hlp);
289  dlinkAddTail(srv, &srv->link, &hlp->servers);
290 
291  if (rfd == wfd) {
292  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
293  fd_note(rfd, fd_note_buf);
294  } else {
295  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
296  fd_note(rfd, fd_note_buf);
297  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
298  fd_note(wfd, fd_note_buf);
299  }
300 
301  commSetNonBlocking(rfd);
302 
303  if (wfd != rfd)
304  commSetNonBlocking(wfd);
305 
306  AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
307  comm_add_close_handler(rfd, closeCall);
308 
309  if (hlp->timeout && hlp->childs.concurrency) {
310  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
312  commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
313  }
314 
315  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
317  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
318  }
319 
321  safe_free(shortname);
322  safe_free(procname);
323  helperKickQueue(hlp);
324 }
325 
331 void
333 {
334  char *shortname;
335  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
336  char fd_note_buf[FD_DESC_SZ];
337  int nargs = 0;
338 
339  if (hlp->cmdline == NULL)
340  return;
341 
342  if (hlp->childs.concurrency)
343  debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
344 
345  char *progname = hlp->cmdline->key;
346 
347  char *s;
348  if ((s = strrchr(progname, '/')))
349  shortname = xstrdup(s + 1);
350  else
351  shortname = xstrdup(progname);
352 
353  /* figure out haw mant new helpers are needed. */
354  int need_new = hlp->childs.needNew();
355 
356  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
357 
358  if (need_new < 1) {
359  debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
360  }
361 
362  char *procname = (char *)xmalloc(strlen(shortname) + 3);
363 
364  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
365 
366  args[nargs] = procname;
367  ++nargs;
368 
369  for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
370  args[nargs] = w->key;
371  ++nargs;
372  }
373 
374  args[nargs] = NULL;
375  ++nargs;
376 
377  assert(nargs <= HELPER_MAX_ARGS);
378 
379  for (int k = 0; k < need_new; ++k) {
380  getCurrentTime();
381  int rfd = -1;
382  int wfd = -1;
383  void * hIpc;
384  pid_t pid = ipcCreate(hlp->ipc_type,
385  progname,
386  args,
387  shortname,
388  hlp->addr,
389  &rfd,
390  &wfd,
391  &hIpc);
392 
393  if (pid < 0) {
394  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
395  continue;
396  }
397 
398  ++ hlp->childs.n_running;
399  ++ hlp->childs.n_active;
401  srv->hIpc = hIpc;
402  srv->pid = pid;
403  srv->initStats();
404  srv->addr = hlp->addr;
405  srv->readPipe = new Comm::Connection;
406  srv->readPipe->fd = rfd;
407  srv->writePipe = new Comm::Connection;
408  srv->writePipe->fd = wfd;
409  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
410  srv->roffset = 0;
411  srv->parent = cbdataReference(hlp);
412  srv->reservationStart = 0;
413 
414  dlinkAddTail(srv, &srv->link, &hlp->servers);
415 
416  if (rfd == wfd) {
417  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
418  fd_note(rfd, fd_note_buf);
419  } else {
420  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
421  fd_note(rfd, fd_note_buf);
422  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
423  fd_note(wfd, fd_note_buf);
424  }
425 
426  commSetNonBlocking(rfd);
427 
428  if (wfd != rfd)
429  commSetNonBlocking(wfd);
430 
431  AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
432  comm_add_close_handler(rfd, closeCall);
433 
434  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
436  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
437  }
438 
440  safe_free(shortname);
441  safe_free(procname);
443 }
444 
445 void
447 {
448  helper_server *srv;
449 
450  if ((srv = GetFirstAvailable(this)))
451  helperDispatch(srv, r);
452  else
453  Enqueue(this, r);
454 
455  syncQueueStats();
456 }
457 
459 static void
460 SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
461 {
462  auto result = Helper::Error;
463  if (!hlp) {
464  debugs(84, 3, "no helper");
465  result = Helper::Unknown;
466  }
467  // else pretend the helper has responded with ERR
468 
469  callback(data, Helper::Reply(result));
470 }
471 
472 void
473 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
474 {
475  if (!hlp || !hlp->trySubmit(buf, callback, data))
476  SubmissionFailure(hlp, callback, data);
477 }
478 
480 bool
482  return stats.queue_size >= static_cast<int>(childs.queue_size);
483 }
484 
485 bool
487  return stats.queue_size > static_cast<int>(childs.queue_size);
488 }
489 
491 void
493 {
494  if (overloaded()) {
495  if (overloadStart) {
496  debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
497  } else {
499  debugs(84, 3, id_name << " became overloaded");
500  }
501  } else {
502  if (overloadStart) {
503  debugs(84, 5, id_name << " is no longer overloaded");
504  if (droppedRequests) {
505  debugs(84, DBG_IMPORTANT, "helper " << id_name <<
506  " is no longer overloaded after dropping " << droppedRequests <<
507  " requests in " << (squid_curtime - overloadStart) << " seconds");
508  droppedRequests = 0;
509  }
510  overloadStart = 0;
511  }
512  }
513 }
514 
518 bool
520 {
521  // re-sync for the configuration may have changed since the last submission
522  syncQueueStats();
523 
524  // Nothing special to do if the new request does not overload (i.e., the
525  // queue is not even full yet) or only _starts_ overloading this helper
526  // (i.e., the queue is currently at its limit).
527  if (!overloaded())
528  return true;
529 
530  if (squid_curtime - overloadStart <= 180)
531  return true; // also OK: overload has not persisted long enough to panic
532 
534  fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
535 
536  if (!droppedRequests) {
537  debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
538  id_name << " helper configured with on-persistent-overload=err");
539  }
540  ++droppedRequests;
541  debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
542  return false;
543 }
544 
545 bool
546 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
547 {
548  if (!prepSubmit())
549  return false; // request was dropped
550 
551  submit(buf, callback, data); // will send or queue
552  return true; // request submitted or queued
553 }
554 
556 void
557 helper::submit(const char *buf, HLPCB * callback, void *data)
558 {
559  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
560  submitRequest(r);
561  debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
562 }
563 
566 void
567 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
568 {
569  if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
570  SubmissionFailure(hlp, callback, data);
571 }
572 
574 bool
575 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
576 {
577  if (!prepSubmit())
578  return false; // request was dropped
579 
580  submit(buf, callback, data, reservation); // will send or queue
581  return true; // request submitted or queued
582 }
583 
584 void
586 {
587  // clear any old reservation
588  if (srv->reserved()) {
589  reservations.erase(srv->reservationId);
590  srv->clearReservation();
591  }
592 
593  srv->reserve();
594  reservations.insert(Reservations::value_type(srv->reservationId, srv));
595 }
596 
597 void
599 {
600  const auto it = reservations.find(reservation);
601  if (it == reservations.end())
602  return;
603 
604  helper_stateful_server *srv = it->second;
605  reservations.erase(it);
606  srv->clearReservation();
607 
608  // schedule a queue kick
609  AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
610  ScheduleCallHere(call);
611 }
612 
615 {
616  const auto it = reservations.find(reservation);
617  if (it == reservations.end())
618  return nullptr;
619  return it->second;
620 }
621 
622 void
624 {
628  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
629 }
630 
631 void
633 {
634  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
635  if (!reservationId)
636  return;
637 
638  ++stats.releases;
639 
641  reservationStart = 0;
642 }
643 
644 void
645 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
646 {
647  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
648 
649  if (buf && reservation) {
650  debugs(84, 5, reservation);
651  helper_stateful_server *lastServer = findServer(reservation);
652  if (!lastServer) {
653  debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
655  r->request.callback(r->request.data, r->reply);
656  delete r;
657  return;
658  }
659  debugs(84, 5, "StatefulSubmit dispatching");
660  helperStatefulDispatch(lastServer, r);
661  } else {
663  if ((srv = StatefulGetFirstAvailable(this))) {
664  reserveServer(srv);
665  helperStatefulDispatch(srv, r);
666  } else
667  StatefulEnqueue(this, r);
668  }
669 
670  debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
671  "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
672 
673  syncQueueStats();
674 }
675 
676 void
677 helper::packStatsInto(Packable *p, const char *label) const
678 {
679  if (label)
680  p->appendf("%s:\n", label);
681 
682  p->appendf(" program: %s\n", cmdline->key);
683  p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
684  p->appendf(" requests sent: %d\n", stats.requests);
685  p->appendf(" replies received: %d\n", stats.replies);
686  p->appendf(" requests timedout: %d\n", stats.timedout);
687  p->appendf(" queue length: %d\n", stats.queue_size);
688  p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
689  p->append("\n",1);
690  p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
691  "ID #",
692  "FD",
693  "PID",
694  "# Requests",
695  "# Replies",
696  "# Timed-out",
697  "Flags",
698  "Time",
699  "Offset",
700  "Request");
701 
702  for (dlink_node *link = servers.head; link; link = link->next) {
703  HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
704  assert(srv);
705  Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
706  double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
707  p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
708  srv->index.value,
709  srv->readPipe->fd,
710  srv->pid,
711  srv->stats.uses,
712  srv->stats.replies,
713  srv->stats.timedout,
714  srv->stats.pending ? 'B' : ' ',
715  srv->flags.writing ? 'W' : ' ',
716  srv->flags.closing ? 'C' : ' ',
717  srv->reserved() ? 'R' : ' ',
718  srv->flags.shutdown ? 'S' : ' ',
719  xaction && xaction->request.placeholder ? 'P' : ' ',
720  tt < 0.0 ? 0.0 : tt,
721  (int) srv->roffset,
722  xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
723  }
724 
725  p->append("\nFlags key:\n"
726  " B\tBUSY\n"
727  " W\tWRITING\n"
728  " C\tCLOSING\n"
729  " R\tRESERVED\n"
730  " S\tSHUTDOWN PENDING\n"
731  " P\tPLACEHOLDER\n", 101);
732 }
733 
734 bool
736  return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
737 }
738 
739 void
741 {
742  dlink_node *link = hlp->servers.head;
743 
744  while (link) {
745  helper_server *srv;
746  srv = (helper_server *)link->data;
747  link = link->next;
748 
749  if (srv->flags.shutdown) {
750  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
751  continue;
752  }
753 
754  assert(hlp->childs.n_active > 0);
755  -- hlp->childs.n_active;
756  srv->flags.shutdown = true; /* request it to shut itself down */
757 
758  if (srv->flags.closing) {
759  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
760  continue;
761  }
762 
763  if (srv->stats.pending) {
764  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
765  continue;
766  }
767 
768  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
769  /* the rest of the details is dealt with in the helperServerFree
770  * close handler
771  */
772  srv->closePipesSafely(hlp->id_name);
773  }
774 }
775 
776 void
778 {
779  dlink_node *link = hlp->servers.head;
781 
782  while (link) {
783  srv = (helper_stateful_server *)link->data;
784  link = link->next;
785 
786  if (srv->flags.shutdown) {
787  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
788  continue;
789  }
790 
791  assert(hlp->childs.n_active > 0);
792  -- hlp->childs.n_active;
793  srv->flags.shutdown = true; /* request it to shut itself down */
794 
795  if (srv->stats.pending) {
796  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
797  continue;
798  }
799 
800  if (srv->flags.closing) {
801  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
802  continue;
803  }
804 
805  if (srv->reserved()) {
806  if (shutting_down) {
807  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
808  } else {
809  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
810  continue;
811  }
812  }
813 
814  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
815 
816  /* the rest of the details is dealt with in the helperStatefulServerFree
817  * close handler
818  */
819  srv->closePipesSafely(hlp->id_name);
820  }
821 }
822 
824 {
825  /* note, don't free id_name, it probably points to static memory */
826 
827  // TODO: if the queue is not empty it will leak Helper::Request's
828  if (!queue.empty())
829  debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
830 }
831 
832 void
833 helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
834 {
835  needsNewServers = false;
836  if (!srv->flags.shutdown) {
837  assert(childs.n_active > 0);
838  --childs.n_active;
839  debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
840 
841  if (childs.needNew() > 0) {
842  debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
843 
845  if (srv->stats.replies < 1)
846  fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
847  else
848  debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
849  }
850  srv->flags.shutdown = true;
851  needsNewServers = true;
852  }
853  }
854 }
855 
856 void
858 {
859  helper *hlp = srv->getParent();
860 
861  bool needsNewServers = false;
862  hlp->handleKilledServer(srv, needsNewServers);
863  if (needsNewServers) {
864  debugs(80, DBG_IMPORTANT, "Starting new helpers");
865  helperOpenServers(hlp);
866  }
867 
868  srv->dropQueued();
869 
870  delete srv;
871 }
872 
873 // XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
874 // TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
875 void
877 {
878  statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
879 
880  bool needsNewServers = false;
881  hlp->handleKilledServer(srv, needsNewServers);
882  if (needsNewServers) {
883  debugs(80, DBG_IMPORTANT, "Starting new helpers");
885  }
886 
887  srv->dropQueued();
888 
889  delete srv;
890 }
891 
893 helper_server::popRequest(int request_number)
894 {
895  Helper::Xaction *r = nullptr;
896  helper_server::RequestIndex::iterator it;
897  if (parent->childs.concurrency) {
898  // If concurrency supported retrieve request from ID
899  it = requestsIndex.find(request_number);
900  if (it != requestsIndex.end()) {
901  r = *(it->second);
902  requests.erase(it->second);
903  requestsIndex.erase(it);
904  }
905  } else if(!requests.empty()) {
906  // Else get the first request from queue, if any
907  r = requests.front();
908  requests.pop_front();
909  }
910 
911  return r;
912 }
913 
915 static void
916 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
917 {
918  if (Helper::Xaction *r = srv->replyXaction) {
919  const bool hasSpace = r->reply.accumulate(msg, msgSize);
920  if (!hasSpace) {
921  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
922  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
923  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
924  srv->closePipesSafely(hlp->id_name);
925  return;
926  }
927 
928  if (!msgEnd)
929  return; // We are waiting for more data.
930 
931  bool retry = false;
933  r->reply.finalize();
935  debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
936  retry = true;
937  } else {
938  HLPCB *callback = r->request.callback;
939  r->request.callback = nullptr;
940  void *cbdata = nullptr;
942  callback(cbdata, r->reply);
943  }
944  }
945 
946  -- srv->stats.pending;
947  ++ srv->stats.replies;
948 
949  ++ hlp->stats.replies;
950 
951  srv->answer_time = current_time;
952 
954 
955  hlp->stats.avg_svc_time =
959 
960  // release or re-submit parsedRequestXaction object
961  srv->replyXaction = nullptr;
962  if (retry) {
963  ++r->request.retries;
964  hlp->submitRequest(r);
965  } else
966  delete r;
967  }
968 
969  if (hlp->timeout && hlp->childs.concurrency)
971 
972  if (!srv->flags.shutdown) {
973  helperKickQueue(hlp);
974  } else if (!srv->flags.closing && !srv->stats.pending) {
975  srv->closeWritePipeSafely(srv->parent->id_name);
976  }
977 }
978 
979 static void
980 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
981 {
982  helper_server *srv = (helper_server *)data;
983  helper *hlp = srv->parent;
985 
986  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
987 
988  if (flag == Comm::ERR_CLOSING) {
989  return;
990  }
991 
992  assert(conn->fd == srv->readPipe->fd);
993 
994  debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
995 
996  if (flag != Comm::OK || len == 0) {
997  srv->closePipesSafely(hlp->id_name);
998  return;
999  }
1000 
1001  srv->roffset += len;
1002  srv->rbuf[srv->roffset] = '\0';
1003  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1004 
1005  if (!srv->stats.pending && !srv->stats.timedout) {
1006  /* someone spoke without being spoken to */
1007  debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected read from " <<
1008  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1009  " bytes '" << srv->rbuf << "'");
1010 
1011  srv->roffset = 0;
1012  srv->rbuf[0] = '\0';
1013  }
1014 
1015  bool needsMore = false;
1016  char *msg = srv->rbuf;
1017  while (*msg && !needsMore) {
1018  int skip = 0;
1019  char *eom = strchr(msg, hlp->eom);
1020  if (eom) {
1021  skip = 1;
1022  debugs(84, 3, "helperHandleRead: end of reply found");
1023  if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1024  *eom = '\0';
1025  // rewind to the \r octet which is the real terminal now
1026  // and remember that we have to skip forward 2 places now.
1027  skip = 2;
1028  --eom;
1029  }
1030  *eom = '\0';
1031  }
1032 
1033  if (!srv->ignoreToEom && !srv->replyXaction) {
1034  int i = 0;
1035  if (hlp->childs.concurrency) {
1036  char *e = NULL;
1037  i = strtol(msg, &e, 10);
1038  // Do we need to check for e == msg? Means wrong response from helper.
1039  // Will be dropped as "unexpected reply on channel 0"
1040  needsMore = !(xisspace(*e) || (eom && e == eom));
1041  if (!needsMore) {
1042  msg = e;
1043  while (*msg && xisspace(*msg))
1044  ++msg;
1045  } // else not enough data to compute request number
1046  }
1047  if (!(srv->replyXaction = srv->popRequest(i))) {
1048  if (srv->stats.timedout) {
1049  debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1050  } else {
1051  debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1052  i << " from " << hlp->id_name << " #" << srv->index <<
1053  " '" << srv->rbuf << "'");
1054  }
1055  srv->ignoreToEom = true;
1056  }
1057  } // else we need to just append reply data to the current Xaction
1058 
1059  if (!needsMore) {
1060  size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1061  assert(msgSize <= srv->rbuf_sz);
1062  helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1063  msg += msgSize + skip;
1064  assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1065 
1066  // The next message should not ignored.
1067  if (eom && srv->ignoreToEom)
1068  srv->ignoreToEom = false;
1069  } else
1070  assert(skip == 0 && eom == NULL);
1071  }
1072 
1073  if (needsMore) {
1074  size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1075  assert(msgSize <= srv->rbuf_sz);
1076  memmove(srv->rbuf, msg, msgSize);
1077  srv->roffset = msgSize;
1078  srv->rbuf[srv->roffset] = '\0';
1079  } else {
1080  // All of the responses parsed and msg points at the end of read data
1081  assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1082  srv->roffset = 0;
1083  }
1084 
1085  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1086  int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1087  assert(spaceSize >= 0);
1088 
1089  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1091  comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1092  }
1093 }
1094 
1095 static void
1096 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1097 {
1098  char *t = NULL;
1100  statefulhelper *hlp = srv->parent;
1102 
1103  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1104 
1105  if (flag == Comm::ERR_CLOSING) {
1106  return;
1107  }
1108 
1109  assert(conn->fd == srv->readPipe->fd);
1110 
1111  debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1112  hlp->id_name << " #" << srv->index);
1113 
1114  if (flag != Comm::OK || len == 0) {
1115  srv->closePipesSafely(hlp->id_name);
1116  return;
1117  }
1118 
1119  srv->roffset += len;
1120  srv->rbuf[srv->roffset] = '\0';
1121  Helper::Xaction *r = srv->requests.front();
1122  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1123 
1124  if (r == NULL) {
1125  /* someone spoke without being spoken to */
1126  debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulHandleRead: unexpected read from " <<
1127  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1128  " bytes '" << srv->rbuf << "'");
1129 
1130  srv->roffset = 0;
1131  }
1132 
1133  if ((t = strchr(srv->rbuf, hlp->eom))) {
1134  debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1135 
1136  if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1137  *t = '\0';
1138  // rewind to the \r octet which is the real terminal now
1139  --t;
1140  }
1141 
1142  *t = '\0';
1143  }
1144 
1145  if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1146  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1147  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1148  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1149  srv->closePipesSafely(hlp->id_name);
1150  return;
1151  }
1158  srv->roffset = 0;
1159 
1160  if (t) {
1161  /* end of reply found */
1162  srv->requests.pop_front(); // we already have it in 'r'
1163  int called = 1;
1164 
1165  if (r && cbdataReferenceValid(r->request.data)) {
1166  r->reply.finalize();
1167  r->reply.reservationId = srv->reservationId;
1168  r->request.callback(r->request.data, r->reply);
1169  } else {
1170  debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1171  called = 0;
1172  }
1173 
1174  delete r;
1175 
1176  -- srv->stats.pending;
1177  ++ srv->stats.replies;
1178 
1179  ++ hlp->stats.replies;
1180  srv->answer_time = current_time;
1181  hlp->stats.avg_svc_time =
1185 
1186  if (called)
1188  else
1189  hlp->cancelReservation(srv->reservationId);
1190  }
1191 
1192  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1193  int spaceSize = srv->rbuf_sz - 1;
1194 
1195  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1197  comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1198  }
1199 }
1200 
1202 static void
1204 {
1205  hlp->queue.push(r);
1206  ++ hlp->stats.queue_size;
1207 
1208  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1209  if (hlp->childs.needNew() > 0) {
1210  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1211  helperOpenServers(hlp);
1212  return;
1213  }
1214 
1215  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1216  return;
1217 
1218  if (squid_curtime - hlp->last_queue_warn < 600)
1219  return;
1220 
1222  return;
1223 
1225 
1226  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1227  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1228  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1229 }
1230 
1231 static void
1233 {
1234  hlp->queue.push(r);
1235  ++ hlp->stats.queue_size;
1236 
1237  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1238  if (hlp->childs.needNew() > 0) {
1239  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1241  return;
1242  }
1243 
1244  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1245  return;
1246 
1247  if (squid_curtime - hlp->last_queue_warn < 600)
1248  return;
1249 
1251  return;
1252 
1254 
1255  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1256  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1257  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1258 }
1259 
1262 {
1263  if (queue.empty())
1264  return nullptr;
1265 
1266  auto *r = queue.front();
1267  queue.pop();
1268  --stats.queue_size;
1269  return r;
1270 }
1271 
1272 static helper_server *
1274 {
1275  dlink_node *n;
1276  helper_server *srv;
1277  helper_server *selected = NULL;
1278  debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1279 
1280  if (hlp->childs.n_running == 0)
1281  return NULL;
1282 
1283  /* Find "least" loaded helper (approx) */
1284  for (n = hlp->servers.head; n != NULL; n = n->next) {
1285  srv = (helper_server *)n->data;
1286 
1287  if (selected && selected->stats.pending <= srv->stats.pending)
1288  continue;
1289 
1290  if (srv->flags.shutdown)
1291  continue;
1292 
1293  if (!srv->stats.pending)
1294  return srv;
1295 
1296  if (selected) {
1297  selected = srv;
1298  break;
1299  }
1300 
1301  selected = srv;
1302  }
1303 
1304  if (!selected) {
1305  debugs(84, 5, "GetFirstAvailable: None available.");
1306  return NULL;
1307  }
1308 
1309  if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1310  debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1311  return NULL;
1312  }
1313 
1314  debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1315  return selected;
1316 }
1317 
1318 static helper_stateful_server *
1320 {
1321  dlink_node *n;
1323  helper_stateful_server *oldestReservedServer = nullptr;
1324  debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1325 
1326  if (hlp->childs.n_running == 0)
1327  return NULL;
1328 
1329  for (n = hlp->servers.head; n != NULL; n = n->next) {
1330  srv = (helper_stateful_server *)n->data;
1331 
1332  if (srv->stats.pending)
1333  continue;
1334 
1335  if (srv->reserved()) {
1337  if (!oldestReservedServer)
1338  oldestReservedServer = srv;
1339  else if (oldestReservedServer->reservationStart < srv->reservationStart)
1340  oldestReservedServer = srv;
1341  debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1342  }
1343  continue;
1344  }
1345 
1346  if (srv->flags.shutdown)
1347  continue;
1348 
1349  debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1350  return srv;
1351  }
1352 
1353  if (oldestReservedServer) {
1354  debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1355  return oldestReservedServer;
1356  }
1357 
1358  debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1359  return nullptr;
1360 }
1361 
1362 static void
1363 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1364 {
1365  helper_server *srv = (helper_server *)data;
1366 
1367  srv->writebuf->clean();
1368  delete srv->writebuf;
1369  srv->writebuf = NULL;
1370  srv->flags.writing = false;
1371 
1372  if (flag != Comm::OK) {
1373  /* Helper server has crashed */
1374  debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1375  return;
1376  }
1377 
1378  if (!srv->wqueue->isNull()) {
1379  srv->writebuf = srv->wqueue;
1380  srv->wqueue = new MemBuf;
1381  srv->flags.writing = true;
1382  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1384  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1385  }
1386 }
1387 
1388 static void
1390 {
1391  helper *hlp = srv->parent;
1392  const uint64_t reqId = ++srv->nextRequestId;
1393 
1394  if (!cbdataReferenceValid(r->request.data)) {
1395  debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1396  delete r;
1397  return;
1398  }
1399 
1400  r->request.Id = reqId;
1401  helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1403 
1404  if (srv->wqueue->isNull())
1405  srv->wqueue->init();
1406 
1407  if (hlp->childs.concurrency) {
1408  srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1409  assert(srv->requestsIndex.size() == srv->requests.size());
1410  srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1411  } else
1412  srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1413 
1414  if (!srv->flags.writing) {
1415  assert(NULL == srv->writebuf);
1416  srv->writebuf = srv->wqueue;
1417  srv->wqueue = new MemBuf;
1418  srv->flags.writing = true;
1419  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1421  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1422  }
1423 
1424  debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1425 
1426  ++ srv->stats.uses;
1427  ++ srv->stats.pending;
1428  ++ hlp->stats.requests;
1429 }
1430 
1431 static void
1433 {}
1434 
1435 static void
1437 {
1438  statefulhelper *hlp = srv->parent;
1439 
1440  if (!cbdataReferenceValid(r->request.data)) {
1441  debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1442  delete r;
1443  hlp->cancelReservation(srv->reservationId);
1444  return;
1445  }
1446 
1447  debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1448 
1449  assert(srv->reservationId);
1450  r->reply.reservationId = srv->reservationId;
1451 
1452  if (r->request.placeholder == 1) {
1453  /* a callback is needed before this request can _use_ a helper. */
1454  /* we don't care about releasing this helper. The request NEVER
1455  * gets to the helper. So we throw away the return code */
1457  r->request.callback(r->request.data, r->reply);
1458  /* throw away the placeholder */
1459  delete r;
1460  /* and push the queue. Note that the callback may have submitted a new
1461  * request to the helper which is why we test for the request */
1462 
1463  if (!srv->requests.size())
1465 
1466  return;
1467  }
1468 
1469  srv->requests.push_back(r);
1470  srv->dispatch_time = current_time;
1471  AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1473  Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1474  debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1475  hlp->id_name << " #" << srv->index << ", " <<
1476  (int) strlen(r->request.buf) << " bytes");
1477 
1478  ++ srv->stats.uses;
1479  ++ srv->stats.pending;
1480  ++ hlp->stats.requests;
1481 }
1482 
1483 static void
1485 {
1486  Helper::Xaction *r;
1487  helper_server *srv;
1488 
1489  while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1490  helperDispatch(srv, r);
1491 }
1492 
1493 static void
1495 {
1496  Helper::Xaction *r;
1498  while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1499  debugs(84, 5, "found srv-" << srv->index);
1500  hlp->reserveServer(srv);
1501  helperStatefulDispatch(srv, r);
1502  }
1503 }
1504 
1505 static void
1507 {
1508  if (!srv->flags.shutdown) {
1510  } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1511  srv->closeWritePipeSafely(srv->parent->id_name);
1512  return;
1513  }
1514 }
1515 
1516 void
1518 {
1520  while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1521  Helper::Xaction *r = requests.front();
1522  RequestIndex::iterator it;
1523  it = requestsIndex.find(r->request.Id);
1524  assert(it != requestsIndex.end());
1525  requestsIndex.erase(it);
1526  requests.pop_front();
1527  debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1528  void *cbdata;
1529  bool retried = false;
1530  if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1531  debugs(84, 2, "Retry request " << r->request.Id);
1532  ++r->request.retries;
1533  parent->submitRequest(r);
1534  retried = true;
1535  } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1536  if (!parent->onTimedOutResponse.isEmpty()) {
1538  r->reply.finalize();
1539  else
1541  r->request.callback(cbdata, r->reply);
1542  } else {
1544  r->request.callback(cbdata, r->reply);
1545  }
1546  }
1547  --stats.pending;
1548  ++stats.timedout;
1549  ++parent->stats.timedout;
1550  if (!retried)
1551  delete r;
1552  }
1553 }
1554 
1555 void
1557 {
1558  debugs(26, 3, io.conn);
1559  helper_server *srv = static_cast<helper_server *>(io.data);
1560 
1562 
1563  debugs(84, 3, io.conn << " establish new helper_server::requestTimeout");
1564  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1566 
1567  const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1568  const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1569 
1570  commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1571 }
1572 
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:283
#define FD_DESC_SZ
Definition: defines.h:32
static IOCB helperStatefulHandleRead
Definition: helper.cc:45
void closePipesSafely(const char *name)
Definition: helper.cc:74
dlink_node link
Definition: helper.h:218
Value value
instance identifier
Definition: InstanceId.h:69
void helperOpenServers(helper *hlp)
Definition: helper.cc:201
time_t reservationStart
when the last reservation was made
Definition: helper.h:317
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:921
void submitRequest(Helper::Xaction *r)
Definition: helper.cc:446
Helper::ChildConfig childs
Configuration settings for number running.
Definition: helper.h:111
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:492
represents a single "stateless helper" process
Definition: helper.h:245
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
wordlist * cmdline
Definition: helper.h:107
#define DBG_CRITICAL
Definition: Stream.h:40
unsigned int queue_size
Definition: ChildConfig.h:91
#define xmalloc
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1436
dlink_list servers
Definition: helper.h:108
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
Definition: helper.cc:614
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:256
struct helper::_stats stats
@ Error
Definition: ResultCode.h:19
void fd_note(int fd, const char *s)
Definition: fd.cc:216
@ actDie
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
bool isEmpty() const
Definition: SBuf.h:431
static helper_stateful_server * StatefulGetFirstAvailable(statefulhelper *hlp)
Definition: helper.cc:1319
static IOCB helperHandleRead
Definition: helper.cc:44
statefulhelper * parent
Definition: helper.h:310
virtual ~helper_stateful_server()
Definition: helper.cc:182
@ Unknown
Definition: ResultCode.h:17
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:39
virtual ~helper_server()
Definition: helper.cc:152
virtual void dropQueued()
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:128
struct HelperServerBase::_helper_flags flags
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
uint64_t timedout
Definition: helper.h:234
#define ScheduleCallHere(call)
Definition: AsyncCall.h:164
const InstanceId< HelperServerBase > index
Definition: helper.h:204
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
int avg_svc_time
Definition: helper.h:129
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1432
int ipc_type
Definition: helper.h:112
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:93
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:557
uint64_t Id
Definition: Request.h:45
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1556
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
int commSetConnTimeout(const Comm::ConnectionPointer &conn, int timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:563
static helper_server * GetFirstAvailable(const helper *hlp)
Definition: helper.cc:1273
time_t last_queue_warn
Definition: helper.h:116
#define xstrdup
static void HelperServerClosed(helper_stateful_server *srv)
close handler to handle exited server processes
Definition: helper.cc:876
char * buf
Definition: Request.h:39
std::queue< Helper::Xaction * > queue
Definition: helper.h:109
#define PRIu64
Definition: types.h:120
int commSetNonBlocking(int fd)
Definition: comm.cc:1038
const size_t ReadBufSize(32 *1024)
Helpers input buffer size.
Definition: cbdata.cc:60
virtual bool reserved() override
whether the server is locked for exclusive use by a client
Definition: helper.h:304
Helper::Reply reply
Definition: helper.h:43
virtual void append(const char *c, int sz)
Definition: MemBuf.cc:209
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:398
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
@ OK
Definition: Flag.h:16
struct timeval dispatch_time
Definition: helper.h:215
virtual void dropQueued() override
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:176
@ ERR_CLOSING
Definition: Flag.h:25
#define cbdataReference(var)
Definition: cbdata.h:341
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
static void HelperServerClosed(helper_server *srv)
close handler to handle exited server processes
Definition: helper.cc:857
unsigned int n_running
Definition: ChildConfig.h:80
uint64_t uses
Definition: helper.h:230
HLPCB * callback
Definition: Request.h:40
A const & max(A const &lhs, A const &rhs)
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:645
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
Requests requests
requests in order of submission/expiration
Definition: helper.h:227
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
Definition: helper.cc:598
#define DBG_DATA
Definition: Stream.h:43
static void * hIpc
Definition: IcmpSquid.cc:33
static pid_t pid
Definition: IcmpSquid.cc:34
time_t last_restart
Definition: helper.h:117
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1363
struct HelperServerBase::@72 stats
Definition: Raw.h:21
void reserveServer(helper_stateful_server *srv)
reserve the given server
Definition: helper.cc:585
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:59
static void helperDispatch(helper_server *srv, Helper::Xaction *r)
Definition: helper.cc:1389
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
#define HELPER_MAX_ARGS
Definition: helper.cc:36
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
int isNull() const
Definition: MemBuf.cc:145
void * hIpc
Definition: helper.h:209
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:17
int timedout
Definition: helper.h:127
#define NULL
Definition: types.h:166
static void helperReturnBuffer(helper_server *srv, helper *hlp, char *msg, size_t msgSize, char *msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:916
const char * rawContent() const
Definition: SBuf.cc:509
int needNew() const
Definition: ChildConfig.cc:59
int placeholder
Definition: Request.h:43
bool accumulate(const char *buf, size_t len)
Definition: Reply.cc:25
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
bool overloaded() const
Definition: helper.cc:486
represents a single helper process abstraction
Definition: helper.h:173
MemBuf * wqueue
Definition: helper.h:251
struct timeval dispatch_time
Definition: Request.h:44
static ReservationId Next()
Helper::ResultCode result
The helper response 'result' field.
Definition: Reply.h:59
unsigned int droppedRequests
requests not sent during helper overload
Definition: helper.h:114
Definition: MemBuf.h:24
Definition: helper.h:64
static void SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:460
void clean()
Definition: MemBuf.cc:110
uint64_t pending
Definition: helper.h:232
struct timeval answer_time
Definition: helper.h:216
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:36
int reconfiguring
Comm::ConnectionPointer conn
Definition: CommCalls.h:85
#define safe_free(x)
Definition: xalloc.h:73
Ip::Address addr
Definition: helper.h:206
int requests
Definition: helper.h:125
bool willOverload() const
Definition: helper.cc:735
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:342
int conn
the current server connection FD
Definition: Transport.cc:26
#define assert(EX)
Definition: assert.h:19
bool retryTimedOut
Whether the timed-out requests must retried.
Definition: helper.h:119
SSL Connection
Definition: Session.h:45
~helper()
Definition: helper.cc:823
void helperShutdown(helper *hlp)
Definition: helper.cc:740
a (temporary) lock on a (stateful) helper channel
Definition: ReservationId.h:18
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:267
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
unsigned int n_max
Definition: ChildConfig.h:48
void packStatsInto(Packable *p, const char *label=NULL) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:677
time_t overloadStart
when the helper became overloaded (zero if it is not)
Definition: helper.h:115
void * data
Definition: Request.h:41
void helperStatefulSubmit(statefulhelper *hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:567
Comm::ConnectionPointer writePipe
Definition: helper.h:208
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
const char * id_name
Definition: helper.h:110
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:62
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:318
size_type length() const
Returns the number of bytes stored in SBuf.
Definition: SBuf.h:415
time_t squid_curtime
Definition: stub_libtime.cc:20
uint64_t replies
Definition: helper.h:231
size_t roffset
Definition: helper.h:213
time_t reservationTimeout
older stateful helper server reservations may be forgotten
Definition: ChildConfig.h:109
SubmissionErrorHandlingAction onPersistentOverload
how to handle a new request for helper that was overloaded for too long
Definition: ChildConfig.h:99
wordlist * next
Definition: wordlist.h:33
MemBuf * writebuf
Definition: helper.h:252
Flag
Definition: Flag.h:15
unsigned int n_startup
Definition: ChildConfig.h:57
#define REDIRECT_AV_FACTOR
Definition: defines.h:53
represents a single "stateful helper" process
Definition: helper.h:295
#define fd_table
Definition: fde.h:189
Helper::Xaction * popRequest(int requestId)
Definition: helper.cc:893
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1506
void helperSubmit(helper *hlp, const char *buf, HLPCB *callback, void *data)
Definition: helper.cc:473
void finalize()
Definition: Reply.cc:38
Helper::Xaction * nextRequest()
Definition: helper.cc:1261
char * rbuf
Definition: helper.h:211
char * key
Definition: wordlist.h:32
void handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
Definition: helper.cc:833
InstanceIdDefinitions(HelperServerBase, "Hlpr")
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
reserved servers indexed by reservation IDs
Definition: helper.cc:575
size_t rbuf_sz
Definition: helper.h:212
void initStats()
Definition: helper.cc:64
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
Definition: Reply.h:65
char * content()
start of the added data
Definition: MemBuf.h:41
SBuf onTimedOutResponse
The response to use when helper response timedout.
Definition: helper.h:121
static void helperKickQueue(helper *hlp)
Definition: helper.cc:1484
#define Important(id)
Definition: Messages.h:91
virtual ~HelperServerBase()
Definition: helper.cc:144
#define DBG_IMPORTANT
Definition: Stream.h:41
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:154
helper * parent
Definition: helper.h:254
Helper::Xaction * replyXaction
Definition: helper.h:260
@ BrokenHelper
Definition: ResultCode.h:20
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:481
Helper::Request request
Definition: helper.h:42
static void helperStatefulKickQueue(statefulhelper *hlp)
Definition: helper.cc:1494
int shutting_down
virtual bool reserved()=0
whether the server is locked for exclusive use by a client
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:27
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1232
bool ignoreToEom
Whether to ignore current message, because it is timed-out or other reason.
Definition: helper.h:263
#define xisspace(x)
Definition: xis.h:17
Ip::Address addr
Definition: helper.h:113
unsigned int concurrency
Definition: ChildConfig.h:72
char progname[]
int queue_size
Definition: helper.h:128
unsigned int n_active
Definition: ChildConfig.h:86
void helperStatefulShutdown(statefulhelper *hlp)
Definition: helper.cc:777
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:546
void closeWritePipeSafely(const char *name)
Definition: helper.cc:102
void helperStatefulOpenServers(statefulhelper *hlp)
Definition: helper.cc:332
time_t timeout
Requests timeout.
Definition: helper.h:118
bool prepSubmit()
Definition: helper.cc:519
static void Enqueue(helper *hlp, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1203
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:196
Helper::ReservationId reservationId
"confirmation ID" of the last
Definition: helper.h:316
@ TimedOut
Definition: ResultCode.h:21
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1517
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:305
char eom
The char which marks the end of (response) message, normally ' '.
Definition: helper.h:122
Reservations reservations
Definition: helper.h:168
int unsigned int
Definition: stub_fd.cc:19
void memFreeBuf(size_t size, void *)
Definition: minimal.cc:84
uint64_t nextRequestId
Definition: helper.h:249
Comm::ConnectionPointer readPipe
Definition: helper.h:207
Holds the required data to serve a helper request.
Definition: helper.h:38

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors