helper.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 84 Helper process maintenance */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31 
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34 
35 #define HELPER_MAX_ARGS 64
36 
38 #define MAX_RETRIES 2
39 
41 const size_t ReadBufSize(32*1024);
42 
45 static void Enqueue(helper * hlp, Helper::Xaction *);
46 static helper_server *GetFirstAvailable(const helper * hlp);
48 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
50 static void helperKickQueue(helper * hlp);
51 static void helperStatefulKickQueue(statefulhelper * hlp);
53 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
54 
59 
61 
62 void
64 {
65 #if _SQUID_WINDOWS_
66  shutdown(writePipe->fd, SD_BOTH);
67 #endif
68 
69  flags.closing = true;
70  if (readPipe->fd == writePipe->fd)
71  readPipe->fd = -1;
72  else
73  readPipe->close();
74  writePipe->close();
75 
76 #if _SQUID_WINDOWS_
77  if (hIpc) {
78  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
80  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
81  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
82  }
83  CloseHandle(hIpc);
84  }
85 #endif
86 }
87 
88 void
90 {
91 #if _SQUID_WINDOWS_
92  shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
93 #endif
94 
95  flags.closing = true;
96  if (readPipe->fd == writePipe->fd)
97  readPipe->fd = -1;
98  writePipe->close();
99 
100 #if _SQUID_WINDOWS_
101  if (hIpc) {
102  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
103  getCurrentTime();
104  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
105  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
106  }
107  CloseHandle(hIpc);
108  }
109 #endif
110 }
111 
112 void
114 {
115  while (!requests.empty()) {
116  // XXX: re-schedule these on another helper?
117  Helper::Xaction *r = requests.front();
118  requests.pop_front();
119  void *cbdata;
122  r->request.callback(cbdata, r->reply);
123  }
124 
125  delete r;
126  }
127 }
128 
129 HelperServerBase::HelperServerBase(const char *desc, Ip::Address &ip, int aPid, void *aIpc, int rfd, int wfd) :
130  pid(aPid),
131  addr(ip),
132  readPipe(new Comm::Connection),
133  writePipe(new Comm::Connection),
134  hIpc(aIpc),
135  rbuf(static_cast<char *>(memAllocBuf(ReadBufSize, &rbuf_sz)))
136 {
137  readPipe->fd = rfd;
138  readPipe->noteStart();
139  fd_note(readPipe->fd, desc);
140 
141  writePipe->fd = wfd;
142  writePipe->noteStart();
143  fd_note(writePipe->fd, desc);
144 }
145 
147 {
148  if (rbuf) {
150  rbuf = NULL;
151  }
152 }
153 
154 helper_server::helper_server(helper *hlp, int aPid, void *aIpc, int rfd, int wfd) :
155  HelperServerBase(hlp->cmdline->key, hlp->addr, aPid, aIpc, rfd, wfd),
156  parent(cbdataReference(hlp))
157 {
158 }
159 
161 {
162  wqueue->clean();
163  delete wqueue;
164 
165  if (writebuf) {
166  writebuf->clean();
167  delete writebuf;
168  writebuf = NULL;
169  }
170 
173 
175 
177  -- parent->childs.n_running;
178 
179  assert(requests.empty());
181 }
182 
183 void
185 {
187  requestsIndex.clear();
188 }
189 
190 helper_stateful_server::helper_stateful_server(statefulhelper *hlp, int aPid, void *aIpc, int rfd, int wfd) :
191  HelperServerBase(hlp->cmdline->key, hlp->addr, aPid, aIpc, rfd, wfd),
192  parent(cbdataReference(hlp))
193 {
194 }
195 
197 {
198  /* TODO: walk the local queue of requests and carry them all out */
201 
203 
205 
207  -- parent->childs.n_running;
208 
209  assert(requests.empty());
210 
212 }
213 
214 void
216 {
217  char *s;
218  char *progname;
219  char *shortname;
220  char *procname;
221  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
222  char fd_note_buf[FD_DESC_SZ];
223  int nargs = 0;
224  int k;
225  pid_t pid;
226  int rfd;
227  int wfd;
228  void * hIpc;
229  wordlist *w;
230 
231  if (hlp->cmdline == NULL)
232  return;
233 
234  progname = hlp->cmdline->key;
235 
236  if ((s = strrchr(progname, '/')))
237  shortname = xstrdup(s + 1);
238  else
239  shortname = xstrdup(progname);
240 
241  /* figure out how many new child are actually needed. */
242  int need_new = hlp->childs.needNew();
243 
244  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
245 
246  if (need_new < 1) {
247  debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
248  }
249 
250  procname = (char *)xmalloc(strlen(shortname) + 3);
251 
252  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
253 
254  args[nargs] = procname;
255  ++nargs;
256 
257  for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
258  args[nargs] = w->key;
259  ++nargs;
260  }
261 
262  args[nargs] = NULL;
263  ++nargs;
264 
265  assert(nargs <= HELPER_MAX_ARGS);
266 
267  for (k = 0; k < need_new; ++k) {
268  getCurrentTime();
269  rfd = wfd = -1;
270  pid = ipcCreate(hlp->ipc_type,
271  progname,
272  args,
273  shortname,
274  hlp->addr,
275  &rfd,
276  &wfd,
277  &hIpc);
278 
279  if (pid < 0) {
280  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
281  continue;
282  }
283 
284  ++ hlp->childs.n_running;
285  ++ hlp->childs.n_active;
286  auto *srv = new helper_server(hlp, pid, hIpc, rfd, wfd);
287  dlinkAddTail(srv, &srv->link, &hlp->servers);
288 
289  if (rfd == wfd) {
290  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
291  fd_note(rfd, fd_note_buf);
292  } else {
293  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
294  fd_note(rfd, fd_note_buf);
295  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
296  fd_note(wfd, fd_note_buf);
297  }
298 
299  commSetNonBlocking(rfd);
300 
301  if (wfd != rfd)
302  commSetNonBlocking(wfd);
303 
304  AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
305  comm_add_close_handler(rfd, closeCall);
306 
307  if (hlp->timeout && hlp->childs.concurrency) {
308  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
310  commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
311  }
312 
313  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
315  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
316  }
317 
319  safe_free(shortname);
320  safe_free(procname);
321  helperKickQueue(hlp);
322 }
323 
329 void
331 {
332  char *shortname;
333  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
334  char fd_note_buf[FD_DESC_SZ];
335  int nargs = 0;
336 
337  if (hlp->cmdline == NULL)
338  return;
339 
340  if (hlp->childs.concurrency)
341  debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
342 
343  char *progname = hlp->cmdline->key;
344 
345  char *s;
346  if ((s = strrchr(progname, '/')))
347  shortname = xstrdup(s + 1);
348  else
349  shortname = xstrdup(progname);
350 
351  /* figure out haw mant new helpers are needed. */
352  int need_new = hlp->childs.needNew();
353 
354  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
355 
356  if (need_new < 1) {
357  debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
358  }
359 
360  char *procname = (char *)xmalloc(strlen(shortname) + 3);
361 
362  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
363 
364  args[nargs] = procname;
365  ++nargs;
366 
367  for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
368  args[nargs] = w->key;
369  ++nargs;
370  }
371 
372  args[nargs] = NULL;
373  ++nargs;
374 
375  assert(nargs <= HELPER_MAX_ARGS);
376 
377  for (int k = 0; k < need_new; ++k) {
378  getCurrentTime();
379  int rfd = -1;
380  int wfd = -1;
381  void * hIpc;
382  pid_t pid = ipcCreate(hlp->ipc_type,
383  progname,
384  args,
385  shortname,
386  hlp->addr,
387  &rfd,
388  &wfd,
389  &hIpc);
390 
391  if (pid < 0) {
392  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
393  continue;
394  }
395 
396  ++ hlp->childs.n_running;
397  ++ hlp->childs.n_active;
398  auto *srv = new helper_stateful_server(hlp, pid, hIpc, rfd, wfd);
399  dlinkAddTail(srv, &srv->link, &hlp->servers);
400 
401  if (rfd == wfd) {
402  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
403  fd_note(rfd, fd_note_buf);
404  } else {
405  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
406  fd_note(rfd, fd_note_buf);
407  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
408  fd_note(wfd, fd_note_buf);
409  }
410 
411  commSetNonBlocking(rfd);
412 
413  if (wfd != rfd)
414  commSetNonBlocking(wfd);
415 
416  AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
417  comm_add_close_handler(rfd, closeCall);
418 
419  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
421  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
422  }
423 
425  safe_free(shortname);
426  safe_free(procname);
428 }
429 
430 void
432 {
433  helper_server *srv;
434 
435  if ((srv = GetFirstAvailable(this)))
436  helperDispatch(srv, r);
437  else
438  Enqueue(this, r);
439 
440  syncQueueStats();
441 }
442 
444 static void
446 {
447  auto result = Helper::Error;
448  if (!hlp) {
449  debugs(84, 3, "no helper");
450  result = Helper::Unknown;
451  }
452  // else pretend the helper has responded with ERR
453 
454  callback(data, Helper::Reply(result));
455 }
456 
457 void
458 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
459 {
460  if (!hlp || !hlp->trySubmit(buf, callback, data))
462 }
463 
465 bool
467  return stats.queue_size >= static_cast<int>(childs.queue_size);
468 }
469 
470 bool
472  return stats.queue_size > static_cast<int>(childs.queue_size);
473 }
474 
476 void
478 {
479  if (overloaded()) {
480  if (overloadStart) {
481  debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
482  } else {
484  debugs(84, 3, id_name << " became overloaded");
485  }
486  } else {
487  if (overloadStart) {
488  debugs(84, 5, id_name << " is no longer overloaded");
489  if (droppedRequests) {
490  debugs(84, DBG_IMPORTANT, "helper " << id_name <<
491  " is no longer overloaded after dropping " << droppedRequests <<
492  " requests in " << (squid_curtime - overloadStart) << " seconds");
493  droppedRequests = 0;
494  }
495  overloadStart = 0;
496  }
497  }
498 }
499 
503 bool
505 {
506  // re-sync for the configuration may have changed since the last submission
507  syncQueueStats();
508 
509  // Nothing special to do if the new request does not overload (i.e., the
510  // queue is not even full yet) or only _starts_ overloading this helper
511  // (i.e., the queue is currently at its limit).
512  if (!overloaded())
513  return true;
514 
515  if (squid_curtime - overloadStart <= 180)
516  return true; // also OK: overload has not persisted long enough to panic
517 
519  fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
520 
521  if (!droppedRequests) {
522  debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
523  id_name << " helper configured with on-persistent-overload=err");
524  }
525  ++droppedRequests;
526  debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
527  return false;
528 }
529 
530 bool
531 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
532 {
533  if (!prepSubmit())
534  return false; // request was dropped
535 
536  submit(buf, callback, data); // will send or queue
537  return true; // request submitted or queued
538 }
539 
541 void
542 helper::submit(const char *buf, HLPCB * callback, void *data)
543 {
545  submitRequest(r);
546  debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
547 }
548 
551 void
552 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
553 {
554  if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
556 }
557 
559 bool
560 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
561 {
562  if (!prepSubmit())
563  return false; // request was dropped
564 
565  submit(buf, callback, data, reservation); // will send or queue
566  return true; // request submitted or queued
567 }
568 
569 void
571 {
572  // clear any old reservation
573  if (srv->reserved()) {
574  reservations.erase(srv->reservationId);
575  srv->clearReservation();
576  }
577 
578  srv->reserve();
579  reservations.insert(Reservations::value_type(srv->reservationId, srv));
580 }
581 
582 void
584 {
585  const auto it = reservations.find(reservation);
586  if (it == reservations.end())
587  return;
588 
589  helper_stateful_server *srv = it->second;
590  reservations.erase(it);
591  srv->clearReservation();
592 
593  // schedule a queue kick
594  AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
595  ScheduleCallHere(call);
596 }
597 
600 {
601  const auto it = reservations.find(reservation);
602  if (it == reservations.end())
603  return nullptr;
604  return it->second;
605 }
606 
607 void
609 {
613  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
614 }
615 
616 void
618 {
619  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
620  if (!reservationId)
621  return;
622 
623  ++stats.releases;
624 
626  reservationStart = 0;
627 }
628 
629 void
630 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
631 {
633 
634  if (buf && reservation) {
635  debugs(84, 5, reservation);
636  helper_stateful_server *lastServer = findServer(reservation);
637  if (!lastServer) {
638  debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
640  r->request.callback(r->request.data, r->reply);
641  delete r;
642  return;
643  }
644  debugs(84, 5, "StatefulSubmit dispatching");
645  helperStatefulDispatch(lastServer, r);
646  } else {
648  if ((srv = StatefulGetFirstAvailable(this))) {
649  reserveServer(srv);
650  helperStatefulDispatch(srv, r);
651  } else
652  StatefulEnqueue(this, r);
653  }
654 
655  debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
656  "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
657 
658  syncQueueStats();
659 }
660 
661 void
662 helper::packStatsInto(Packable *p, const char *label) const
663 {
664  if (label)
665  p->appendf("%s:\n", label);
666 
667  p->appendf(" program: %s\n", cmdline->key);
668  p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
669  p->appendf(" requests sent: %d\n", stats.requests);
670  p->appendf(" replies received: %d\n", stats.replies);
671  p->appendf(" requests timedout: %d\n", stats.timedout);
672  p->appendf(" queue length: %d\n", stats.queue_size);
673  p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
674  p->append("\n",1);
675  p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
676  "ID #",
677  "FD",
678  "PID",
679  "# Requests",
680  "# Replies",
681  "# Timed-out",
682  "Flags",
683  "Time",
684  "Offset",
685  "Request");
686 
687  for (dlink_node *link = servers.head; link; link = link->next) {
688  HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
689  assert(srv);
690  Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
691  double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
692  p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
693  srv->index.value,
694  srv->readPipe->fd,
695  srv->pid,
696  srv->stats.uses,
697  srv->stats.replies,
698  srv->stats.timedout,
699  srv->stats.pending ? 'B' : ' ',
700  srv->flags.writing ? 'W' : ' ',
701  srv->flags.closing ? 'C' : ' ',
702  srv->reserved() ? 'R' : ' ',
703  srv->flags.shutdown ? 'S' : ' ',
704  xaction && xaction->request.placeholder ? 'P' : ' ',
705  tt < 0.0 ? 0.0 : tt,
706  (int) srv->roffset,
707  xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
708  }
709 
710  p->append("\nFlags key:\n"
711  " B\tBUSY\n"
712  " W\tWRITING\n"
713  " C\tCLOSING\n"
714  " R\tRESERVED\n"
715  " S\tSHUTDOWN PENDING\n"
716  " P\tPLACEHOLDER\n", 101);
717 }
718 
719 bool
721  return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
722 }
723 
724 void
726 {
727  dlink_node *link = hlp->servers.head;
728 
729  while (link) {
730  helper_server *srv;
731  srv = (helper_server *)link->data;
732  link = link->next;
733 
734  if (srv->flags.shutdown) {
735  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
736  continue;
737  }
738 
739  assert(hlp->childs.n_active > 0);
740  -- hlp->childs.n_active;
741  srv->flags.shutdown = true; /* request it to shut itself down */
742 
743  if (srv->flags.closing) {
744  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
745  continue;
746  }
747 
748  if (srv->stats.pending) {
749  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
750  continue;
751  }
752 
753  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
754  /* the rest of the details is dealt with in the helperServerFree
755  * close handler
756  */
757  srv->closePipesSafely(hlp->id_name);
758  }
759 }
760 
761 void
763 {
764  dlink_node *link = hlp->servers.head;
766 
767  while (link) {
768  srv = (helper_stateful_server *)link->data;
769  link = link->next;
770 
771  if (srv->flags.shutdown) {
772  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
773  continue;
774  }
775 
776  assert(hlp->childs.n_active > 0);
777  -- hlp->childs.n_active;
778  srv->flags.shutdown = true; /* request it to shut itself down */
779 
780  if (srv->stats.pending) {
781  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
782  continue;
783  }
784 
785  if (srv->flags.closing) {
786  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
787  continue;
788  }
789 
790  if (srv->reserved()) {
791  if (shutting_down) {
792  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
793  } else {
794  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
795  continue;
796  }
797  }
798 
799  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
800 
801  /* the rest of the details is dealt with in the helperStatefulServerFree
802  * close handler
803  */
804  srv->closePipesSafely(hlp->id_name);
805  }
806 }
807 
809 {
810  /* note, don't free id_name, it probably points to static memory */
811 
812  // TODO: if the queue is not empty it will leak Helper::Request's
813  if (!queue.empty())
814  debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
815 }
816 
817 void
818 helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
819 {
820  needsNewServers = false;
821  if (!srv->flags.shutdown) {
822  assert(childs.n_active > 0);
823  --childs.n_active;
824  debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
825 
826  if (childs.needNew() > 0) {
827  debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
828 
830  if (srv->stats.replies < 1)
831  fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
832  else
833  debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
834  }
835  srv->flags.shutdown = true;
836  needsNewServers = true;
837  }
838  }
839 }
840 
841 void
843 {
844  helper *hlp = srv->getParent();
845 
846  bool needsNewServers = false;
847  hlp->handleKilledServer(srv, needsNewServers);
848  if (needsNewServers) {
849  debugs(80, DBG_IMPORTANT, "Starting new helpers");
850  helperOpenServers(hlp);
851  }
852 
853  srv->dropQueued();
854 
855  delete srv;
856 }
857 
858 // XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
859 // TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
860 void
862 {
863  statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
864 
865  bool needsNewServers = false;
866  hlp->handleKilledServer(srv, needsNewServers);
867  if (needsNewServers) {
868  debugs(80, DBG_IMPORTANT, "Starting new helpers");
870  }
871 
872  srv->dropQueued();
873 
874  delete srv;
875 }
876 
878 helper_server::popRequest(int request_number)
879 {
880  Helper::Xaction *r = nullptr;
881  helper_server::RequestIndex::iterator it;
882  if (parent->childs.concurrency) {
883  // If concurrency supported retrieve request from ID
884  it = requestsIndex.find(request_number);
885  if (it != requestsIndex.end()) {
886  r = *(it->second);
887  requests.erase(it->second);
888  requestsIndex.erase(it);
889  }
890  } else if(!requests.empty()) {
891  // Else get the first request from queue, if any
892  r = requests.front();
893  requests.pop_front();
894  }
895 
896  return r;
897 }
898 
900 static void
901 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
902 {
903  if (Helper::Xaction *r = srv->replyXaction) {
904  const bool hasSpace = r->reply.accumulate(msg, msgSize);
905  if (!hasSpace) {
906  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
907  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
908  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
909  srv->closePipesSafely(hlp->id_name);
910  return;
911  }
912 
913  if (!msgEnd)
914  return; // We are waiting for more data.
915 
916  bool retry = false;
917  if (cbdataReferenceValid(r->request.data)) {
918  r->reply.finalize();
919  if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
920  debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
921  retry = true;
922  } else {
923  HLPCB *callback = r->request.callback;
924  r->request.callback = nullptr;
925  void *cbdata = nullptr;
926  if (cbdataReferenceValidDone(r->request.data, &cbdata))
927  callback(cbdata, r->reply);
928  }
929  }
930 
931  -- srv->stats.pending;
932  ++ srv->stats.replies;
933 
934  ++ hlp->stats.replies;
935 
936  srv->answer_time = current_time;
937 
938  srv->dispatch_time = r->request.dispatch_time;
939 
940  hlp->stats.avg_svc_time =
942  tvSubMsec(r->request.dispatch_time, current_time),
944 
945  // release or re-submit parsedRequestXaction object
946  srv->replyXaction = nullptr;
947  if (retry) {
948  ++r->request.retries;
949  hlp->submitRequest(r);
950  } else
951  delete r;
952  }
953 
954  if (hlp->timeout && hlp->childs.concurrency)
956 
957  if (!srv->flags.shutdown) {
958  helperKickQueue(hlp);
959  } else if (!srv->flags.closing && !srv->stats.pending) {
960  srv->flags.closing=true;
961  srv->writePipe->close();
962  }
963 }
964 
965 static void
966 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
967 {
968  helper_server *srv = (helper_server *)data;
969  helper *hlp = srv->parent;
971 
972  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
973 
974  if (flag == Comm::ERR_CLOSING) {
975  return;
976  }
977 
978  assert(conn->fd == srv->readPipe->fd);
979 
980  debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
981 
982  if (flag != Comm::OK || len == 0) {
983  srv->closePipesSafely(hlp->id_name);
984  return;
985  }
986 
987  srv->roffset += len;
988  srv->rbuf[srv->roffset] = '\0';
989  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
990 
991  if (!srv->stats.pending && !srv->stats.timedout) {
992  /* someone spoke without being spoken to */
993  debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
994  hlp->id_name << " #" << srv->index << ", " << (int)len <<
995  " bytes '" << srv->rbuf << "'");
996 
997  srv->roffset = 0;
998  srv->rbuf[0] = '\0';
999  }
1000 
1001  bool needsMore = false;
1002  char *msg = srv->rbuf;
1003  while (*msg && !needsMore) {
1004  int skip = 0;
1005  char *eom = strchr(msg, hlp->eom);
1006  if (eom) {
1007  skip = 1;
1008  debugs(84, 3, "helperHandleRead: end of reply found");
1009  if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1010  *eom = '\0';
1011  // rewind to the \r octet which is the real terminal now
1012  // and remember that we have to skip forward 2 places now.
1013  skip = 2;
1014  --eom;
1015  }
1016  *eom = '\0';
1017  }
1018 
1019  if (!srv->ignoreToEom && !srv->replyXaction) {
1020  int i = 0;
1021  if (hlp->childs.concurrency) {
1022  char *e = NULL;
1023  i = strtol(msg, &e, 10);
1024  // Do we need to check for e == msg? Means wrong response from helper.
1025  // Will be dropped as "unexpected reply on channel 0"
1026  needsMore = !(xisspace(*e) || (eom && e == eom));
1027  if (!needsMore) {
1028  msg = e;
1029  while (*msg && xisspace(*msg))
1030  ++msg;
1031  } // else not enough data to compute request number
1032  }
1033  if (!(srv->replyXaction = srv->popRequest(i))) {
1034  if (srv->stats.timedout) {
1035  debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1036  } else {
1037  debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
1038  i << " from " << hlp->id_name << " #" << srv->index <<
1039  " '" << srv->rbuf << "'");
1040  }
1041  srv->ignoreToEom = true;
1042  }
1043  } // else we need to just append reply data to the current Xaction
1044 
1045  if (!needsMore) {
1046  size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1047  assert(msgSize <= srv->rbuf_sz);
1048  helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1049  msg += msgSize + skip;
1050  assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1051 
1052  // The next message should not ignored.
1053  if (eom && srv->ignoreToEom)
1054  srv->ignoreToEom = false;
1055  } else
1056  assert(skip == 0 && eom == NULL);
1057  }
1058 
1059  if (needsMore) {
1060  size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1061  assert(msgSize <= srv->rbuf_sz);
1062  memmove(srv->rbuf, msg, msgSize);
1063  srv->roffset = msgSize;
1064  srv->rbuf[srv->roffset] = '\0';
1065  } else {
1066  // All of the responses parsed and msg points at the end of read data
1067  assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1068  srv->roffset = 0;
1069  }
1070 
1071  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1072  int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1073  assert(spaceSize >= 0);
1074 
1075  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1077  comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1078  }
1079 }
1080 
1081 static void
1082 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1083 {
1084  char *t = NULL;
1086  statefulhelper *hlp = srv->parent;
1088 
1089  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1090 
1091  if (flag == Comm::ERR_CLOSING) {
1092  return;
1093  }
1094 
1095  assert(conn->fd == srv->readPipe->fd);
1096 
1097  debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1098  hlp->id_name << " #" << srv->index);
1099 
1100  if (flag != Comm::OK || len == 0) {
1101  srv->closePipesSafely(hlp->id_name);
1102  return;
1103  }
1104 
1105  srv->roffset += len;
1106  srv->rbuf[srv->roffset] = '\0';
1107  Helper::Xaction *r = srv->requests.front();
1108  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1109 
1110  if (r == NULL) {
1111  /* someone spoke without being spoken to */
1112  debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1113  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1114  " bytes '" << srv->rbuf << "'");
1115 
1116  srv->roffset = 0;
1117  }
1118 
1119  if ((t = strchr(srv->rbuf, hlp->eom))) {
1120  debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1121 
1122  if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1123  *t = '\0';
1124  // rewind to the \r octet which is the real terminal now
1125  --t;
1126  }
1127 
1128  *t = '\0';
1129  }
1130 
1131  if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1132  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1133  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1134  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1135  srv->closePipesSafely(hlp->id_name);
1136  return;
1137  }
1144  srv->roffset = 0;
1145 
1146  if (t) {
1147  /* end of reply found */
1148  srv->requests.pop_front(); // we already have it in 'r'
1149  int called = 1;
1150 
1151  if (r && cbdataReferenceValid(r->request.data)) {
1152  r->reply.finalize();
1153  r->reply.reservationId = srv->reservationId;
1154  r->request.callback(r->request.data, r->reply);
1155  } else {
1156  debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1157  called = 0;
1158  }
1159 
1160  delete r;
1161 
1162  -- srv->stats.pending;
1163  ++ srv->stats.replies;
1164 
1165  ++ hlp->stats.replies;
1166  srv->answer_time = current_time;
1167  hlp->stats.avg_svc_time =
1171 
1172  if (called)
1174  else
1175  hlp->cancelReservation(srv->reservationId);
1176  }
1177 
1178  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1179  int spaceSize = srv->rbuf_sz - 1;
1180 
1181  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1183  comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1184  }
1185 }
1186 
1188 static void
1190 {
1191  hlp->queue.push(r);
1192  ++ hlp->stats.queue_size;
1193 
1194  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1195  if (hlp->childs.needNew() > 0) {
1196  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1197  helperOpenServers(hlp);
1198  return;
1199  }
1200 
1201  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1202  return;
1203 
1204  if (squid_curtime - hlp->last_queue_warn < 600)
1205  return;
1206 
1208  return;
1209 
1211 
1212  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1213  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1214  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1215 }
1216 
1217 static void
1219 {
1220  hlp->queue.push(r);
1221  ++ hlp->stats.queue_size;
1222 
1223  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1224  if (hlp->childs.needNew() > 0) {
1225  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1227  return;
1228  }
1229 
1230  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1231  return;
1232 
1233  if (squid_curtime - hlp->last_queue_warn < 600)
1234  return;
1235 
1237  return;
1238 
1240 
1241  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1242  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1243  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1244 }
1245 
1248 {
1249  if (queue.empty())
1250  return nullptr;
1251 
1252  auto *r = queue.front();
1253  queue.pop();
1254  --stats.queue_size;
1255  return r;
1256 }
1257 
1258 static helper_server *
1260 {
1261  dlink_node *n;
1262  helper_server *srv;
1263  helper_server *selected = NULL;
1264  debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1265 
1266  if (hlp->childs.n_running == 0)
1267  return NULL;
1268 
1269  /* Find "least" loaded helper (approx) */
1270  for (n = hlp->servers.head; n != NULL; n = n->next) {
1271  srv = (helper_server *)n->data;
1272 
1273  if (selected && selected->stats.pending <= srv->stats.pending)
1274  continue;
1275 
1276  if (srv->flags.shutdown)
1277  continue;
1278 
1279  if (!srv->stats.pending)
1280  return srv;
1281 
1282  if (selected) {
1283  selected = srv;
1284  break;
1285  }
1286 
1287  selected = srv;
1288  }
1289 
1290  if (!selected) {
1291  debugs(84, 5, "GetFirstAvailable: None available.");
1292  return NULL;
1293  }
1294 
1295  if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1296  debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1297  return NULL;
1298  }
1299 
1300  debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1301  return selected;
1302 }
1303 
1304 static helper_stateful_server *
1306 {
1307  dlink_node *n;
1309  helper_stateful_server *oldestReservedServer = nullptr;
1310  debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1311 
1312  if (hlp->childs.n_running == 0)
1313  return NULL;
1314 
1315  for (n = hlp->servers.head; n != NULL; n = n->next) {
1316  srv = (helper_stateful_server *)n->data;
1317 
1318  if (srv->stats.pending)
1319  continue;
1320 
1321  if (srv->reserved()) {
1323  if (!oldestReservedServer)
1324  oldestReservedServer = srv;
1325  else if (oldestReservedServer->reservationStart < srv->reservationStart)
1326  oldestReservedServer = srv;
1327  debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1328  }
1329  continue;
1330  }
1331 
1332  if (srv->flags.shutdown)
1333  continue;
1334 
1335  debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1336  return srv;
1337  }
1338 
1339  if (oldestReservedServer) {
1340  debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1341  return oldestReservedServer;
1342  }
1343 
1344  debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1345  return nullptr;
1346 }
1347 
1348 static void
1349 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1350 {
1351  helper_server *srv = (helper_server *)data;
1352 
1353  srv->writebuf->clean();
1354  delete srv->writebuf;
1355  srv->writebuf = NULL;
1356  srv->flags.writing = false;
1357 
1358  if (flag != Comm::OK) {
1359  /* Helper server has crashed */
1360  debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1361  return;
1362  }
1363 
1364  if (!srv->wqueue->isNull()) {
1365  srv->writebuf = srv->wqueue;
1366  srv->wqueue = new MemBuf;
1367  srv->flags.writing = true;
1368  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1370  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1371  }
1372 }
1373 
1374 static void
1376 {
1377  helper *hlp = srv->parent;
1378  const uint64_t reqId = ++srv->nextRequestId;
1379 
1380  if (!cbdataReferenceValid(r->request.data)) {
1381  debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1382  delete r;
1383  return;
1384  }
1385 
1386  r->request.Id = reqId;
1387  helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1389 
1390  if (srv->wqueue->isNull())
1391  srv->wqueue->init();
1392 
1393  if (hlp->childs.concurrency) {
1394  srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1395  assert(srv->requestsIndex.size() == srv->requests.size());
1396  srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1397  } else
1398  srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1399 
1400  if (!srv->flags.writing) {
1401  assert(NULL == srv->writebuf);
1402  srv->writebuf = srv->wqueue;
1403  srv->wqueue = new MemBuf;
1404  srv->flags.writing = true;
1405  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1407  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1408  }
1409 
1410  debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1411 
1412  ++ srv->stats.uses;
1413  ++ srv->stats.pending;
1414  ++ hlp->stats.requests;
1415 }
1416 
1417 static void
1419 {}
1420 
1421 static void
1423 {
1424  statefulhelper *hlp = srv->parent;
1425 
1426  if (!cbdataReferenceValid(r->request.data)) {
1427  debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1428  delete r;
1429  hlp->cancelReservation(srv->reservationId);
1430  return;
1431  }
1432 
1433  debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1434 
1435  assert(srv->reservationId);
1436  r->reply.reservationId = srv->reservationId;
1437 
1438  if (r->request.placeholder == 1) {
1439  /* a callback is needed before this request can _use_ a helper. */
1440  /* we don't care about releasing this helper. The request NEVER
1441  * gets to the helper. So we throw away the return code */
1443  r->request.callback(r->request.data, r->reply);
1444  /* throw away the placeholder */
1445  delete r;
1446  /* and push the queue. Note that the callback may have submitted a new
1447  * request to the helper which is why we test for the request */
1448 
1449  if (!srv->requests.size())
1451 
1452  return;
1453  }
1454 
1455  srv->requests.push_back(r);
1456  srv->dispatch_time = current_time;
1457  AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1459  Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1460  debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1461  hlp->id_name << " #" << srv->index << ", " <<
1462  (int) strlen(r->request.buf) << " bytes");
1463 
1464  ++ srv->stats.uses;
1465  ++ srv->stats.pending;
1466  ++ hlp->stats.requests;
1467 }
1468 
1469 static void
1471 {
1472  Helper::Xaction *r;
1473  helper_server *srv;
1474 
1475  while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1476  helperDispatch(srv, r);
1477 }
1478 
1479 static void
1481 {
1482  Helper::Xaction *r;
1484  while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1485  debugs(84, 5, "found srv-" << srv->index);
1486  hlp->reserveServer(srv);
1487  helperStatefulDispatch(srv, r);
1488  }
1489 }
1490 
1491 static void
1493 {
1494  if (!srv->flags.shutdown) {
1496  } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1497  srv->closeWritePipeSafely(srv->parent->id_name);
1498  return;
1499  }
1500 }
1501 
1502 void
1504 {
1506  while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1507  Helper::Xaction *r = requests.front();
1508  RequestIndex::iterator it;
1509  it = requestsIndex.find(r->request.Id);
1510  assert(it != requestsIndex.end());
1511  requestsIndex.erase(it);
1512  requests.pop_front();
1513  debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1514  void *cbdata;
1515  bool retried = false;
1516  if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1517  debugs(84, 2, "Retry request " << r->request.Id);
1518  ++r->request.retries;
1519  parent->submitRequest(r);
1520  retried = true;
1521  } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1522  if (!parent->onTimedOutResponse.isEmpty()) {
1524  r->reply.finalize();
1525  else
1527  r->request.callback(cbdata, r->reply);
1528  } else {
1530  r->request.callback(cbdata, r->reply);
1531  }
1532  }
1533  --stats.pending;
1534  ++stats.timedout;
1535  ++parent->stats.timedout;
1536  if (!retried)
1537  delete r;
1538  }
1539 }
1540 
1541 void
1543 {
1544  debugs(26, 3, HERE << io.conn);
1545  helper_server *srv = static_cast<helper_server *>(io.data);
1546 
1548 
1549  debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1550  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1552 
1553  const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1554  const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1555 
1556  commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1557 }
1558 
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:271
#define FD_DESC_SZ
Definition: defines.h:46
static IOCB helperStatefulHandleRead
Definition: helper.cc:44
void closePipesSafely(const char *name)
Definition: helper.cc:63
dlink_node link
Definition: helper.h:208
Value value
instance identifier
Definition: InstanceId.h:69
void helperOpenServers(helper *hlp)
Definition: helper.cc:215
time_t reservationStart
when the last reservation was made
Definition: helper.h:307
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:983
void submitRequest(Helper::Xaction *r)
Definition: helper.cc:431
Helper::ChildConfig childs
configuration settings for number running
Definition: helper.h:99
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:477
represents a single "stateless helper" process
Definition: helper.h:230
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
wordlist * cmdline
Definition: helper.h:95
unsigned int queue_size
Definition: ChildConfig.h:91
#define xmalloc
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1422
dlink_list servers
Definition: helper.h:96
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
Definition: helper.cc:599
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:256
struct helper::_stats stats
@ Error
Definition: ResultCode.h:19
void fd_note(int fd, const char *s)
Definition: fd.cc:246
@ actDie
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
bool isEmpty() const
Definition: SBuf.h:420
static helper_stateful_server * StatefulGetFirstAvailable(statefulhelper *hlp)
Definition: helper.cc:1305
static IOCB helperHandleRead
Definition: helper.cc:43
statefulhelper * parent
Definition: helper.h:300
virtual ~helper_stateful_server()
Definition: helper.cc:196
@ Unknown
Definition: ResultCode.h:17
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:38
virtual ~helper_server()
Definition: helper.cc:160
virtual void dropQueued()
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:113
struct HelperServerBase::_helper_flags flags
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: old_api.cc:345
uint64_t timedout
requests which timed-out
Definition: helper.h:224
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
const InstanceId< HelperServerBase > index
Definition: helper.h:194
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
int avg_svc_time
Definition: helper.h:117
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1418
int ipc_type
Definition: helper.h:100
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:96
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:542
uint64_t Id
Definition: Request.h:45
int const char int
Definition: stub_libmem.cc:75
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1542
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
int commSetConnTimeout(const Comm::ConnectionPointer &conn, int timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:567
static helper_server * GetFirstAvailable(const helper *hlp)
Definition: helper.cc:1259
time_t last_queue_warn
Definition: helper.h:104
#define xstrdup
static void HelperServerClosed(helper_stateful_server *srv)
close handler to handle exited server processes
Definition: helper.cc:861
char * buf
Definition: Request.h:39
const A & max(A const &lhs, A const &rhs)
std::queue< Helper::Xaction * > queue
Definition: helper.h:97
#define PRIu64
Definition: types.h:120
int commSetNonBlocking(int fd)
Definition: comm.cc:1098
const size_t ReadBufSize(32 *1024)
Helpers input buffer size.
Definition: cbdata.cc:60
Abstraction layer for TCP, UDP, TLS, UDS and filedescriptor sockets.
Definition: AcceptLimiter.h:16
virtual bool reserved() override
whether the server is locked for exclusive use by a client
Definition: helper.h:294
Helper::Reply reply
Definition: helper.h:46
virtual void append(const char *c, int sz)
Definition: MemBuf.cc:216
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:412
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:26
@ OK
Definition: Flag.h:16
#define DBG_CRITICAL
Definition: Debug.h:45
struct timeval dispatch_time
Definition: helper.h:205
virtual void dropQueued() override
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:184
@ ERR_CLOSING
Definition: Flag.h:25
#define cbdataReference(var)
Definition: cbdata.h:341
time_t getCurrentTime(void)
Get current time.
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
#define DBG_IMPORTANT
Definition: Debug.h:46
static void HelperServerClosed(helper_server *srv)
close handler to handle exited server processes
Definition: helper.cc:842
unsigned int n_running
Definition: ChildConfig.h:80
uint64_t uses
requests sent to this helper
Definition: helper.h:220
HLPCB * callback
Definition: Request.h:40
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:630
Requests requests
requests in order of submission/expiration
Definition: helper.h:217
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
Definition: helper.cc:583
static void * hIpc
Definition: IcmpSquid.cc:34
static pid_t pid
Definition: IcmpSquid.cc:35
time_t last_restart
Definition: helper.h:105
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1349
Definition: Debug.h:188
void reserveServer(helper_stateful_server *srv)
reserve the given server
Definition: helper.cc:570
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:59
static void helperDispatch(helper_server *srv, Helper::Xaction *r)
Definition: helper.cc:1375
HelperServerBase(const char *desc, Ip::Address &, int aPid, void *aIpc, int rfd, int wfd)
Definition: helper.cc:129
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
#define HELPER_MAX_ARGS
Definition: helper.cc:35
helper_stateful_server(statefulhelper *hlp, int pid, void *hIpc, int rfd, int wfd)
Definition: helper.cc:190
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
helper_server(helper *hlp, int pid, void *hIpc, int rfd, int wfd)
Definition: helper.cc:154
int isNull() const
Definition: MemBuf.cc:148
void * hIpc
Definition: helper.h:199
int timedout
Definition: helper.h:115
#define NULL
Definition: types.h:166
static void helperReturnBuffer(helper_server *srv, helper *hlp, char *msg, size_t msgSize, char *msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:901
const char * rawContent() const
Definition: SBuf.cc:519
int needNew() const
Definition: ChildConfig.cc:59
int placeholder
Definition: Request.h:43
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:128
bool accumulate(const char *buf, size_t len)
Definition: Reply.cc:25
bool overloaded() const
Definition: helper.cc:471
represents a single helper process abstraction
Definition: helper.h:160
MemBuf * wqueue
Definition: helper.h:240
struct timeval dispatch_time
Definition: Request.h:44
static ReservationId Next()
Helper::ResultCode result
The helper response 'result' field.
Definition: Reply.h:59
unsigned int droppedRequests
requests not sent during helper overload
Definition: helper.h:102
Definition: MemBuf.h:23
Definition: helper.h:65
static void SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:445
void clean()
Definition: MemBuf.cc:113
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:157
uint64_t pending
queued lookups waiting to be sent to this helper
Definition: helper.h:222
struct timeval answer_time
Definition: helper.h:206
void const char HLPCB void * data
Definition: stub_helper.cc:16
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:36
int reconfiguring
Comm::ConnectionPointer conn
Definition: CommCalls.h:85
#define safe_free(x)
Definition: xalloc.h:73
int tvSubMsec(struct timeval, struct timeval)
Definition: time.cc:37
int requests
Definition: helper.h:113
bool willOverload() const
Definition: helper.cc:720
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:342
int conn
the current server connection FD
Definition: Transport.cc:26
#define assert(EX)
Definition: assert.h:19
virtual ~helper()
Definition: helper.cc:808
bool retryTimedOut
whether the timed-out requests must retried
Definition: helper.h:107
void helperShutdown(helper *hlp)
Definition: helper.cc:725
a (temporary) lock on a (stateful) helper channel
Definition: ReservationId.h:17
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:256
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
unsigned int n_max
Definition: ChildConfig.h:48
void packStatsInto(Packable *p, const char *label=NULL) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:662
time_t overloadStart
when the helper became overloaded (zero if it is not)
Definition: helper.h:103
void * data
Definition: Request.h:41
void helperStatefulSubmit(statefulhelper *hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:552
Comm::ConnectionPointer writePipe
Definition: helper.h:198
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
const char * id_name
Definition: helper.h:98
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:35
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:62
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:318
size_type length() const
Returns the number of bytes stored in SBuf.
Definition: SBuf.h:404
time_t squid_curtime
Definition: stub_time.cc:17
uint64_t replies
replies received from this helper
Definition: helper.h:221
size_t roffset
Definition: helper.h:203
time_t reservationTimeout
older stateful helper server reservations may be forgotten
Definition: ChildConfig.h:109
SubmissionErrorHandlingAction onPersistentOverload
how to handle a new request for helper that was overloaded for too long
Definition: ChildConfig.h:99
wordlist * next
Definition: wordlist.h:34
MemBuf * writebuf
Definition: helper.h:241
Flag
Definition: Flag.h:15
unsigned int n_startup
Definition: ChildConfig.h:57
#define REDIRECT_AV_FACTOR
Definition: defines.h:85
represents a single "stateful helper" process
Definition: helper.h:282
#define fd_table
Definition: fde.h:189
Helper::Xaction * popRequest(int requestId)
Definition: helper.cc:878
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1492
void helperSubmit(helper *hlp, const char *buf, HLPCB *callback, void *data)
Definition: helper.cc:458
void finalize()
Definition: Reply.cc:38
Helper::Xaction * nextRequest()
Definition: helper.cc:1247
char * rbuf
Definition: helper.h:201
struct timeval current_time
Definition: stub_time.cc:15
char * key
Definition: wordlist.h:33
void handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
Definition: helper.cc:818
InstanceIdDefinitions(HelperServerBase, "Hlpr")
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:560
size_t rbuf_sz
Definition: helper.h:202
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
Definition: Reply.h:65
char * content()
start of the added data
Definition: MemBuf.h:41
SBuf onTimedOutResponse
the response to use when helper response timedout
Definition: helper.h:109
static void helperKickQueue(helper *hlp)
Definition: helper.cc:1470
struct HelperServerBase::@75 stats
virtual ~HelperServerBase()
Definition: helper.cc:146
void const char HLPCB * callback
Definition: stub_helper.cc:16
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
helper * parent
Definition: helper.h:243
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
Helper::Xaction * replyXaction
Definition: helper.h:249
@ BrokenHelper
Definition: ResultCode.h:20
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:466
Helper::Request request
Definition: helper.h:45
static void helperStatefulKickQueue(statefulhelper *hlp)
Definition: helper.cc:1480
int shutting_down
virtual bool reserved()=0
whether the server is locked for exclusive use by a client
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:27
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1218
bool ignoreToEom
Whether to ignore current message, because it is timed-out or other reason.
Definition: helper.h:252
#define xisspace(x)
Definition: xis.h:17
Ip::Address addr
Definition: helper.h:101
unsigned int concurrency
Definition: ChildConfig.h:72
char progname[]
int queue_size
Definition: helper.h:116
unsigned int n_active
Definition: ChildConfig.h:86
void helperStatefulShutdown(statefulhelper *hlp)
Definition: helper.cc:762
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:531
void closeWritePipeSafely(const char *name)
Definition: helper.cc:89
void helperStatefulOpenServers(statefulhelper *hlp)
Definition: helper.cc:330
time_t timeout
requests timeout
Definition: helper.h:106
bool prepSubmit()
Definition: helper.cc:504
static void Enqueue(helper *hlp, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1189
Helper::ReservationId reservationId
"confirmation ID" of the last
Definition: helper.h:306
@ TimedOut
Definition: ResultCode.h:21
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1503
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:295
char eom
the char which marks the end of (response) message
Definition: helper.h:110
void const char * buf
Definition: stub_helper.cc:16
Reservations reservations
reserved servers indexed by reservation IDs
Definition: helper.h:156
void memFreeBuf(size_t size, void *)
Definition: old_api.cc:384
uint64_t nextRequestId
Definition: helper.h:238
Comm::ConnectionPointer readPipe
Definition: helper.h:197
#define DBG_DATA
Definition: Debug.h:48
Holds the required data to serve a helper request.
Definition: helper.h:41

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors