helper.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2019 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 84 Helper process maintenance */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31 
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34 
35 #define HELPER_MAX_ARGS 64
36 
38 #define MAX_RETRIES 2
39 
41 const size_t ReadBufSize(32*1024);
42 
45 static void Enqueue(helper * hlp, Helper::Xaction *);
46 static helper_server *GetFirstAvailable(const helper * hlp);
48 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
50 static void helperKickQueue(helper * hlp);
51 static void helperStatefulKickQueue(statefulhelper * hlp);
53 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
54 
59 
61 
62 void
64 {
65  stats.uses=0;
66  stats.replies=0;
67  stats.pending=0;
68  stats.releases=0;
69  stats.timedout = 0;
70 }
71 
72 void
74 {
75 #if _SQUID_WINDOWS_
76  shutdown(writePipe->fd, SD_BOTH);
77 #endif
78 
79  flags.closing = true;
80  if (readPipe->fd == writePipe->fd)
81  readPipe->fd = -1;
82  else
83  readPipe->close();
84  writePipe->close();
85 
86 #if _SQUID_WINDOWS_
87  if (hIpc) {
88  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
90  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
91  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
92  }
93  CloseHandle(hIpc);
94  }
95 #endif
96 }
97 
98 void
100 {
101 #if _SQUID_WINDOWS_
102  shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
103 #endif
104 
105  flags.closing = true;
106  if (readPipe->fd == writePipe->fd)
107  readPipe->fd = -1;
108  writePipe->close();
109 
110 #if _SQUID_WINDOWS_
111  if (hIpc) {
112  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
113  getCurrentTime();
114  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
115  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
116  }
117  CloseHandle(hIpc);
118  }
119 #endif
120 }
121 
122 void
124 {
125  while (!requests.empty()) {
126  // XXX: re-schedule these on another helper?
127  Helper::Xaction *r = requests.front();
128  requests.pop_front();
129  void *cbdata;
130  if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
132  r->request.callback(cbdata, r->reply);
133  }
134 
135  delete r;
136  }
137 }
138 
140 {
141  if (rbuf) {
143  rbuf = NULL;
144  }
145 }
146 
148 {
149  wqueue->clean();
150  delete wqueue;
151 
152  if (writebuf) {
153  writebuf->clean();
154  delete writebuf;
155  writebuf = NULL;
156  }
157 
159  closeWritePipeSafely(parent->id_name);
160 
161  dlinkDelete(&link, &parent->servers);
162 
163  assert(parent->childs.n_running > 0);
164  -- parent->childs.n_running;
165 
166  assert(requests.empty());
167  cbdataReferenceDone(parent);
168 }
169 
170 void
172 {
174  requestsIndex.clear();
175 }
176 
178 {
179  /* TODO: walk the local queue of requests and carry them all out */
181  closeWritePipeSafely(parent->id_name);
182 
183  parent->cancelReservation(reservationId);
184 
185  dlinkDelete(&link, &parent->servers);
186 
187  assert(parent->childs.n_running > 0);
188  -- parent->childs.n_running;
189 
190  assert(requests.empty());
191 
192  cbdataReferenceDone(parent);
193 }
194 
195 void
197 {
198  char *s;
199  char *progname;
200  char *shortname;
201  char *procname;
202  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
203  char fd_note_buf[FD_DESC_SZ];
204  helper_server *srv;
205  int nargs = 0;
206  int k;
207  pid_t pid;
208  int rfd;
209  int wfd;
210  void * hIpc;
211  wordlist *w;
212 
213  if (hlp->cmdline == NULL)
214  return;
215 
216  progname = hlp->cmdline->key;
217 
218  if ((s = strrchr(progname, '/')))
219  shortname = xstrdup(s + 1);
220  else
221  shortname = xstrdup(progname);
222 
223  /* figure out how many new child are actually needed. */
224  int need_new = hlp->childs.needNew();
225 
226  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
227 
228  if (need_new < 1) {
229  debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
230  }
231 
232  procname = (char *)xmalloc(strlen(shortname) + 3);
233 
234  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
235 
236  args[nargs] = procname;
237  ++nargs;
238 
239  for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
240  args[nargs] = w->key;
241  ++nargs;
242  }
243 
244  args[nargs] = NULL;
245  ++nargs;
246 
247  assert(nargs <= HELPER_MAX_ARGS);
248 
249  for (k = 0; k < need_new; ++k) {
250  getCurrentTime();
251  rfd = wfd = -1;
252  pid = ipcCreate(hlp->ipc_type,
253  progname,
254  args,
255  shortname,
256  hlp->addr,
257  &rfd,
258  &wfd,
259  &hIpc);
260 
261  if (pid < 0) {
262  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
263  continue;
264  }
265 
266  ++ hlp->childs.n_running;
267  ++ hlp->childs.n_active;
268  srv = new helper_server;
269  srv->hIpc = hIpc;
270  srv->pid = pid;
271  srv->initStats();
272  srv->addr = hlp->addr;
273  srv->readPipe = new Comm::Connection;
274  srv->readPipe->fd = rfd;
275  srv->writePipe = new Comm::Connection;
276  srv->writePipe->fd = wfd;
277  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
278  srv->wqueue = new MemBuf;
279  srv->roffset = 0;
280  srv->nextRequestId = 0;
281  srv->replyXaction = NULL;
282  srv->ignoreToEom = false;
283  srv->parent = cbdataReference(hlp);
284  dlinkAddTail(srv, &srv->link, &hlp->servers);
285 
286  if (rfd == wfd) {
287  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
288  fd_note(rfd, fd_note_buf);
289  } else {
290  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
291  fd_note(rfd, fd_note_buf);
292  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
293  fd_note(wfd, fd_note_buf);
294  }
295 
296  commSetNonBlocking(rfd);
297 
298  if (wfd != rfd)
299  commSetNonBlocking(wfd);
300 
301  AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
302  comm_add_close_handler(rfd, closeCall);
303 
304  if (hlp->timeout && hlp->childs.concurrency) {
305  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
307  commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
308  }
309 
310  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
312  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
313  }
314 
316  safe_free(shortname);
317  safe_free(procname);
318  helperKickQueue(hlp);
319 }
320 
326 void
328 {
329  char *shortname;
330  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
331  char fd_note_buf[FD_DESC_SZ];
332  int nargs = 0;
333 
334  if (hlp->cmdline == NULL)
335  return;
336 
337  if (hlp->childs.concurrency)
338  debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
339 
340  char *progname = hlp->cmdline->key;
341 
342  char *s;
343  if ((s = strrchr(progname, '/')))
344  shortname = xstrdup(s + 1);
345  else
346  shortname = xstrdup(progname);
347 
348  /* figure out haw mant new helpers are needed. */
349  int need_new = hlp->childs.needNew();
350 
351  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
352 
353  if (need_new < 1) {
354  debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
355  }
356 
357  char *procname = (char *)xmalloc(strlen(shortname) + 3);
358 
359  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
360 
361  args[nargs] = procname;
362  ++nargs;
363 
364  for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
365  args[nargs] = w->key;
366  ++nargs;
367  }
368 
369  args[nargs] = NULL;
370  ++nargs;
371 
372  assert(nargs <= HELPER_MAX_ARGS);
373 
374  for (int k = 0; k < need_new; ++k) {
375  getCurrentTime();
376  int rfd = -1;
377  int wfd = -1;
378  void * hIpc;
379  pid_t pid = ipcCreate(hlp->ipc_type,
380  progname,
381  args,
382  shortname,
383  hlp->addr,
384  &rfd,
385  &wfd,
386  &hIpc);
387 
388  if (pid < 0) {
389  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
390  continue;
391  }
392 
393  ++ hlp->childs.n_running;
394  ++ hlp->childs.n_active;
396  srv->hIpc = hIpc;
397  srv->pid = pid;
398  srv->initStats();
399  srv->addr = hlp->addr;
400  srv->readPipe = new Comm::Connection;
401  srv->readPipe->fd = rfd;
402  srv->writePipe = new Comm::Connection;
403  srv->writePipe->fd = wfd;
404  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
405  srv->roffset = 0;
406  srv->parent = cbdataReference(hlp);
407  srv->reservationStart = 0;
408 
409  dlinkAddTail(srv, &srv->link, &hlp->servers);
410 
411  if (rfd == wfd) {
412  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
413  fd_note(rfd, fd_note_buf);
414  } else {
415  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
416  fd_note(rfd, fd_note_buf);
417  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
418  fd_note(wfd, fd_note_buf);
419  }
420 
421  commSetNonBlocking(rfd);
422 
423  if (wfd != rfd)
424  commSetNonBlocking(wfd);
425 
426  AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
427  comm_add_close_handler(rfd, closeCall);
428 
429  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
431  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
432  }
433 
435  safe_free(shortname);
436  safe_free(procname);
438 }
439 
440 void
442 {
443  helper_server *srv;
444 
445  if ((srv = GetFirstAvailable(this)))
446  helperDispatch(srv, r);
447  else
448  Enqueue(this, r);
449 
450  syncQueueStats();
451 }
452 
454 static void
456 {
457  auto result = Helper::Error;
458  if (!hlp) {
459  debugs(84, 3, "no helper");
460  result = Helper::Unknown;
461  }
462  // else pretend the helper has responded with ERR
463 
464  callback(data, Helper::Reply(result));
465 }
466 
467 void
468 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
469 {
470  if (!hlp || !hlp->trySubmit(buf, callback, data))
471  SubmissionFailure(hlp, callback, data);
472 }
473 
475 bool
477  return stats.queue_size >= static_cast<int>(childs.queue_size);
478 }
479 
480 bool
482  return stats.queue_size > static_cast<int>(childs.queue_size);
483 }
484 
486 void
488 {
489  if (overloaded()) {
490  if (overloadStart) {
491  debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
492  } else {
493  overloadStart = squid_curtime;
494  debugs(84, 3, id_name << " became overloaded");
495  }
496  } else {
497  if (overloadStart) {
498  debugs(84, 5, id_name << " is no longer overloaded");
499  if (droppedRequests) {
500  debugs(84, DBG_IMPORTANT, "helper " << id_name <<
501  " is no longer overloaded after dropping " << droppedRequests <<
502  " requests in " << (squid_curtime - overloadStart) << " seconds");
503  droppedRequests = 0;
504  }
505  overloadStart = 0;
506  }
507  }
508 }
509 
513 bool
515 {
516  // re-sync for the configuration may have changed since the last submission
517  syncQueueStats();
518 
519  // Nothing special to do if the new request does not overload (i.e., the
520  // queue is not even full yet) or only _starts_ overloading this helper
521  // (i.e., the queue is currently at its limit).
522  if (!overloaded())
523  return true;
524 
525  if (squid_curtime - overloadStart <= 180)
526  return true; // also OK: overload has not persisted long enough to panic
527 
528  if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
529  fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
530 
531  if (!droppedRequests) {
532  debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
533  id_name << " helper configured with on-persistent-overload=err");
534  }
535  ++droppedRequests;
536  debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
537  return false;
538 }
539 
540 bool
541 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
542 {
543  if (!prepSubmit())
544  return false; // request was dropped
545 
546  submit(buf, callback, data); // will send or queue
547  return true; // request submitted or queued
548 }
549 
551 void
552 helper::submit(const char *buf, HLPCB * callback, void *data)
553 {
554  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
555  submitRequest(r);
556  debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
557 }
558 
561 void
562 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
563 {
564  if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
565  SubmissionFailure(hlp, callback, data);
566 }
567 
569 bool
570 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
571 {
572  if (!prepSubmit())
573  return false; // request was dropped
574 
575  submit(buf, callback, data, reservation); // will send or queue
576  return true; // request submitted or queued
577 }
578 
579 void
581 {
582  // clear any old reservation
583  if (srv->reserved()) {
584  reservations.erase(srv->reservationId);
585  srv->clearReservation();
586  }
587 
588  srv->reserve();
589  reservations.insert(Reservations::value_type(srv->reservationId, srv));
590 }
591 
592 void
594 {
595  const auto it = reservations.find(reservation);
596  if (it == reservations.end())
597  return;
598 
599  helper_stateful_server *srv = it->second;
600  reservations.erase(it);
601  srv->clearReservation();
602 
603  // schedule a queue kick
604  AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
605  ScheduleCallHere(call);
606 }
607 
610 {
611  const auto it = reservations.find(reservation);
612  if (it == reservations.end())
613  return nullptr;
614  return it->second;
615 }
616 
617 void
619 {
620  assert(!reservationId);
621  reservationStart = squid_curtime;
622  reservationId = Helper::ReservationId::Next();
623  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
624 }
625 
626 void
628 {
629  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
630  if (!reservationId)
631  return;
632 
633  ++stats.releases;
634 
635  reservationId.clear();
636  reservationStart = 0;
637 }
638 
639 void
640 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
641 {
642  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
643 
644  if (buf && reservation) {
645  debugs(84, 5, reservation);
646  helper_stateful_server *lastServer = findServer(reservation);
647  if (!lastServer) {
648  debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
650  r->request.callback(r->request.data, r->reply);
651  delete r;
652  return;
653  }
654  debugs(84, 5, "StatefulSubmit dispatching");
655  helperStatefulDispatch(lastServer, r);
656  } else {
658  if ((srv = StatefulGetFirstAvailable(this))) {
659  reserveServer(srv);
660  helperStatefulDispatch(srv, r);
661  } else
662  StatefulEnqueue(this, r);
663  }
664 
665  debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
666  "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
667 
668  syncQueueStats();
669 }
670 
671 void
672 helper::packStatsInto(Packable *p, const char *label) const
673 {
674  if (label)
675  p->appendf("%s:\n", label);
676 
677  p->appendf(" program: %s\n", cmdline->key);
678  p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
679  p->appendf(" requests sent: %d\n", stats.requests);
680  p->appendf(" replies received: %d\n", stats.replies);
681  p->appendf(" requests timedout: %d\n", stats.timedout);
682  p->appendf(" queue length: %d\n", stats.queue_size);
683  p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
684  p->append("\n",1);
685  p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
686  "ID #",
687  "FD",
688  "PID",
689  "# Requests",
690  "# Replies",
691  "# Timed-out",
692  "Flags",
693  "Time",
694  "Offset",
695  "Request");
696 
697  for (dlink_node *link = servers.head; link; link = link->next) {
698  HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
699  assert(srv);
700  Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
701  double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
702  p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
703  srv->index.value,
704  srv->readPipe->fd,
705  srv->pid,
706  srv->stats.uses,
707  srv->stats.replies,
708  srv->stats.timedout,
709  srv->stats.pending ? 'B' : ' ',
710  srv->flags.writing ? 'W' : ' ',
711  srv->flags.closing ? 'C' : ' ',
712  srv->reserved() ? 'R' : ' ',
713  srv->flags.shutdown ? 'S' : ' ',
714  xaction && xaction->request.placeholder ? 'P' : ' ',
715  tt < 0.0 ? 0.0 : tt,
716  (int) srv->roffset,
717  xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
718  }
719 
720  p->append("\nFlags key:\n"
721  " B\tBUSY\n"
722  " W\tWRITING\n"
723  " C\tCLOSING\n"
724  " R\tRESERVED\n"
725  " S\tSHUTDOWN PENDING\n"
726  " P\tPLACEHOLDER\n", 101);
727 }
728 
729 bool
731  return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
732 }
733 
734 void
736 {
737  dlink_node *link = hlp->servers.head;
738 
739  while (link) {
740  helper_server *srv;
741  srv = (helper_server *)link->data;
742  link = link->next;
743 
744  if (srv->flags.shutdown) {
745  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
746  continue;
747  }
748 
749  assert(hlp->childs.n_active > 0);
750  -- hlp->childs.n_active;
751  srv->flags.shutdown = true; /* request it to shut itself down */
752 
753  if (srv->flags.closing) {
754  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
755  continue;
756  }
757 
758  if (srv->stats.pending) {
759  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
760  continue;
761  }
762 
763  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
764  /* the rest of the details is dealt with in the helperServerFree
765  * close handler
766  */
767  srv->closePipesSafely(hlp->id_name);
768  }
769 }
770 
771 void
773 {
774  dlink_node *link = hlp->servers.head;
776 
777  while (link) {
778  srv = (helper_stateful_server *)link->data;
779  link = link->next;
780 
781  if (srv->flags.shutdown) {
782  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
783  continue;
784  }
785 
786  assert(hlp->childs.n_active > 0);
787  -- hlp->childs.n_active;
788  srv->flags.shutdown = true; /* request it to shut itself down */
789 
790  if (srv->stats.pending) {
791  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
792  continue;
793  }
794 
795  if (srv->flags.closing) {
796  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
797  continue;
798  }
799 
800  if (srv->reserved()) {
801  if (shutting_down) {
802  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
803  } else {
804  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
805  continue;
806  }
807  }
808 
809  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
810 
811  /* the rest of the details is dealt with in the helperStatefulServerFree
812  * close handler
813  */
814  srv->closePipesSafely(hlp->id_name);
815  }
816 }
817 
819 {
820  /* note, don't free id_name, it probably points to static memory */
821 
822  // TODO: if the queue is not empty it will leak Helper::Request's
823  if (!queue.empty())
824  debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
825 }
826 
827 void
828 helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
829 {
830  needsNewServers = false;
831  if (!srv->flags.shutdown) {
832  assert(childs.n_active > 0);
833  --childs.n_active;
834  debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
835 
836  if (childs.needNew() > 0) {
837  debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
838 
839  if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
840  if (srv->stats.replies < 1)
841  fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
842  else
843  debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
844  }
845  srv->flags.shutdown = true;
846  needsNewServers = true;
847  }
848  }
849 }
850 
851 void
853 {
854  helper *hlp = srv->getParent();
855 
856  bool needsNewServers = false;
857  hlp->handleKilledServer(srv, needsNewServers);
858  if (needsNewServers) {
859  debugs(80, DBG_IMPORTANT, "Starting new helpers");
860  helperOpenServers(hlp);
861  }
862 
863  srv->dropQueued();
864 
865  delete srv;
866 }
867 
868 // XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
869 // TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
870 void
872 {
873  statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
874 
875  bool needsNewServers = false;
876  hlp->handleKilledServer(srv, needsNewServers);
877  if (needsNewServers) {
878  debugs(80, DBG_IMPORTANT, "Starting new helpers");
880  }
881 
882  srv->dropQueued();
883 
884  delete srv;
885 }
886 
888 helper_server::popRequest(int request_number)
889 {
890  Helper::Xaction *r = nullptr;
891  helper_server::RequestIndex::iterator it;
892  if (parent->childs.concurrency) {
893  // If concurency supported retrieve request from ID
894  it = requestsIndex.find(request_number);
895  if (it != requestsIndex.end()) {
896  r = *(it->second);
897  requests.erase(it->second);
898  requestsIndex.erase(it);
899  }
900  } else if(!requests.empty()) {
901  // Else get the first request from queue, if any
902  r = requests.front();
903  requests.pop_front();
904  }
905 
906  return r;
907 }
908 
910 static void
911 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
912 {
913  if (Helper::Xaction *r = srv->replyXaction) {
914  const bool hasSpace = r->reply.accumulate(msg, msgSize);
915  if (!hasSpace) {
916  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
917  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
918  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
919  srv->closePipesSafely(hlp->id_name);
920  return;
921  }
922 
923  if (!msgEnd)
924  return; // We are waiting for more data.
925 
926  bool retry = false;
927  if (cbdataReferenceValid(r->request.data)) {
928  r->reply.finalize();
929  if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
930  debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
931  retry = true;
932  } else {
933  HLPCB *callback = r->request.callback;
934  r->request.callback = nullptr;
935  void *cbdata = nullptr;
936  if (cbdataReferenceValidDone(r->request.data, &cbdata))
937  callback(cbdata, r->reply);
938  }
939  }
940 
941  -- srv->stats.pending;
942  ++ srv->stats.replies;
943 
944  ++ hlp->stats.replies;
945 
946  srv->answer_time = current_time;
947 
948  srv->dispatch_time = r->request.dispatch_time;
949 
950  hlp->stats.avg_svc_time =
952  tvSubMsec(r->request.dispatch_time, current_time),
954 
955  // release or re-submit parsedRequestXaction object
956  srv->replyXaction = nullptr;
957  if (retry) {
958  ++r->request.retries;
959  hlp->submitRequest(r);
960  } else
961  delete r;
962  }
963 
964  if (hlp->timeout && hlp->childs.concurrency)
966 
967  if (!srv->flags.shutdown) {
968  helperKickQueue(hlp);
969  } else if (!srv->flags.closing && !srv->stats.pending) {
970  srv->flags.closing=true;
971  srv->writePipe->close();
972  }
973 }
974 
975 static void
976 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
977 {
978  helper_server *srv = (helper_server *)data;
979  helper *hlp = srv->parent;
981 
982  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
983 
984  if (flag == Comm::ERR_CLOSING) {
985  return;
986  }
987 
988  assert(conn->fd == srv->readPipe->fd);
989 
990  debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
991 
992  if (flag != Comm::OK || len == 0) {
993  srv->closePipesSafely(hlp->id_name);
994  return;
995  }
996 
997  srv->roffset += len;
998  srv->rbuf[srv->roffset] = '\0';
999  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1000 
1001  if (!srv->stats.pending && !srv->stats.timedout) {
1002  /* someone spoke without being spoken to */
1003  debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
1004  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1005  " bytes '" << srv->rbuf << "'");
1006 
1007  srv->roffset = 0;
1008  srv->rbuf[0] = '\0';
1009  }
1010 
1011  bool needsMore = false;
1012  char *msg = srv->rbuf;
1013  while (*msg && !needsMore) {
1014  int skip = 0;
1015  char *eom = strchr(msg, hlp->eom);
1016  if (eom) {
1017  skip = 1;
1018  debugs(84, 3, "helperHandleRead: end of reply found");
1019  if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1020  *eom = '\0';
1021  // rewind to the \r octet which is the real terminal now
1022  // and remember that we have to skip forward 2 places now.
1023  skip = 2;
1024  --eom;
1025  }
1026  *eom = '\0';
1027  }
1028 
1029  if (!srv->ignoreToEom && !srv->replyXaction) {
1030  int i = 0;
1031  if (hlp->childs.concurrency) {
1032  char *e = NULL;
1033  i = strtol(msg, &e, 10);
1034  // Do we need to check for e == msg? Means wrong response from helper.
1035  // Will be droped as "unexpected reply on channel 0"
1036  needsMore = !(xisspace(*e) || (eom && e == eom));
1037  if (!needsMore) {
1038  msg = e;
1039  while (*msg && xisspace(*msg))
1040  ++msg;
1041  } // else not enough data to compute request number
1042  }
1043  if (!(srv->replyXaction = srv->popRequest(i))) {
1044  if (srv->stats.timedout) {
1045  debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1046  } else {
1047  debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
1048  i << " from " << hlp->id_name << " #" << srv->index <<
1049  " '" << srv->rbuf << "'");
1050  }
1051  srv->ignoreToEom = true;
1052  }
1053  } // else we need to just append reply data to the current Xaction
1054 
1055  if (!needsMore) {
1056  size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1057  assert(msgSize <= srv->rbuf_sz);
1058  helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1059  msg += msgSize + skip;
1060  assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1061 
1062  // The next message should not ignored.
1063  if (eom && srv->ignoreToEom)
1064  srv->ignoreToEom = false;
1065  } else
1066  assert(skip == 0 && eom == NULL);
1067  }
1068 
1069  if (needsMore) {
1070  size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1071  assert(msgSize <= srv->rbuf_sz);
1072  memmove(srv->rbuf, msg, msgSize);
1073  srv->roffset = msgSize;
1074  srv->rbuf[srv->roffset] = '\0';
1075  } else {
1076  // All of the responses parsed and msg points at the end of read data
1077  assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1078  srv->roffset = 0;
1079  }
1080 
1081  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1082  int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1083  assert(spaceSize >= 0);
1084 
1085  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1087  comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1088  }
1089 }
1090 
1091 static void
1092 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1093 {
1094  char *t = NULL;
1096  statefulhelper *hlp = srv->parent;
1098 
1099  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1100 
1101  if (flag == Comm::ERR_CLOSING) {
1102  return;
1103  }
1104 
1105  assert(conn->fd == srv->readPipe->fd);
1106 
1107  debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1108  hlp->id_name << " #" << srv->index);
1109 
1110  if (flag != Comm::OK || len == 0) {
1111  srv->closePipesSafely(hlp->id_name);
1112  return;
1113  }
1114 
1115  srv->roffset += len;
1116  srv->rbuf[srv->roffset] = '\0';
1117  Helper::Xaction *r = srv->requests.front();
1118  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1119 
1120  if (r == NULL) {
1121  /* someone spoke without being spoken to */
1122  debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1123  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1124  " bytes '" << srv->rbuf << "'");
1125 
1126  srv->roffset = 0;
1127  }
1128 
1129  if ((t = strchr(srv->rbuf, hlp->eom))) {
1130  debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1131 
1132  if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1133  *t = '\0';
1134  // rewind to the \r octet which is the real terminal now
1135  --t;
1136  }
1137 
1138  *t = '\0';
1139  }
1140 
1141  if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1142  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1143  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1144  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1145  srv->closePipesSafely(hlp->id_name);
1146  return;
1147  }
1154  srv->roffset = 0;
1155 
1156  if (t) {
1157  /* end of reply found */
1158  srv->requests.pop_front(); // we already have it in 'r'
1159  int called = 1;
1160 
1161  if (r && cbdataReferenceValid(r->request.data)) {
1162  r->reply.finalize();
1163  r->reply.reservationId = srv->reservationId;
1164  r->request.callback(r->request.data, r->reply);
1165  } else {
1166  debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1167  called = 0;
1168  }
1169 
1170  delete r;
1171 
1172  -- srv->stats.pending;
1173  ++ srv->stats.replies;
1174 
1175  ++ hlp->stats.replies;
1176  srv->answer_time = current_time;
1177  hlp->stats.avg_svc_time =
1181 
1182  if (called)
1184  else
1185  hlp->cancelReservation(srv->reservationId);
1186  }
1187 
1188  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1189  int spaceSize = srv->rbuf_sz - 1;
1190 
1191  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1193  comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1194  }
1195 }
1196 
1198 static void
1200 {
1201  hlp->queue.push(r);
1202  ++ hlp->stats.queue_size;
1203 
1204  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1205  if (hlp->childs.needNew() > 0) {
1206  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1207  helperOpenServers(hlp);
1208  return;
1209  }
1210 
1211  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1212  return;
1213 
1214  if (squid_curtime - hlp->last_queue_warn < 600)
1215  return;
1216 
1218  return;
1219 
1221 
1222  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1223  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1224  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1225 }
1226 
1227 static void
1229 {
1230  hlp->queue.push(r);
1231  ++ hlp->stats.queue_size;
1232 
1233  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1234  if (hlp->childs.needNew() > 0) {
1235  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1237  return;
1238  }
1239 
1240  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1241  return;
1242 
1243  if (squid_curtime - hlp->last_queue_warn < 600)
1244  return;
1245 
1247  return;
1248 
1250 
1251  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1252  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1253  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1254 }
1255 
1258 {
1259  if (queue.empty())
1260  return nullptr;
1261 
1262  auto *r = queue.front();
1263  queue.pop();
1264  --stats.queue_size;
1265  return r;
1266 }
1267 
1268 static helper_server *
1270 {
1271  dlink_node *n;
1272  helper_server *srv;
1273  helper_server *selected = NULL;
1274  debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1275 
1276  if (hlp->childs.n_running == 0)
1277  return NULL;
1278 
1279  /* Find "least" loaded helper (approx) */
1280  for (n = hlp->servers.head; n != NULL; n = n->next) {
1281  srv = (helper_server *)n->data;
1282 
1283  if (selected && selected->stats.pending <= srv->stats.pending)
1284  continue;
1285 
1286  if (srv->flags.shutdown)
1287  continue;
1288 
1289  if (!srv->stats.pending)
1290  return srv;
1291 
1292  if (selected) {
1293  selected = srv;
1294  break;
1295  }
1296 
1297  selected = srv;
1298  }
1299 
1300  if (!selected) {
1301  debugs(84, 5, "GetFirstAvailable: None available.");
1302  return NULL;
1303  }
1304 
1305  if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1306  debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1307  return NULL;
1308  }
1309 
1310  debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1311  return selected;
1312 }
1313 
1314 static helper_stateful_server *
1316 {
1317  dlink_node *n;
1319  helper_stateful_server *oldestReservedServer = nullptr;
1320  debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1321 
1322  if (hlp->childs.n_running == 0)
1323  return NULL;
1324 
1325  for (n = hlp->servers.head; n != NULL; n = n->next) {
1326  srv = (helper_stateful_server *)n->data;
1327 
1328  if (srv->stats.pending)
1329  continue;
1330 
1331  if (srv->reserved()) {
1333  if (!oldestReservedServer)
1334  oldestReservedServer = srv;
1335  else if (oldestReservedServer->reservationStart < srv->reservationStart)
1336  oldestReservedServer = srv;
1337  debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1338  }
1339  continue;
1340  }
1341 
1342  if (srv->flags.shutdown)
1343  continue;
1344 
1345  debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1346  return srv;
1347  }
1348 
1349  if (oldestReservedServer) {
1350  debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1351  return oldestReservedServer;
1352  }
1353 
1354  debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1355  return nullptr;
1356 }
1357 
1358 static void
1359 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1360 {
1361  helper_server *srv = (helper_server *)data;
1362 
1363  srv->writebuf->clean();
1364  delete srv->writebuf;
1365  srv->writebuf = NULL;
1366  srv->flags.writing = false;
1367 
1368  if (flag != Comm::OK) {
1369  /* Helper server has crashed */
1370  debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1371  return;
1372  }
1373 
1374  if (!srv->wqueue->isNull()) {
1375  srv->writebuf = srv->wqueue;
1376  srv->wqueue = new MemBuf;
1377  srv->flags.writing = true;
1378  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1380  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1381  }
1382 }
1383 
1384 static void
1386 {
1387  helper *hlp = srv->parent;
1388  const uint64_t reqId = ++srv->nextRequestId;
1389 
1390  if (!cbdataReferenceValid(r->request.data)) {
1391  debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1392  delete r;
1393  return;
1394  }
1395 
1396  r->request.Id = reqId;
1397  helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1399 
1400  if (srv->wqueue->isNull())
1401  srv->wqueue->init();
1402 
1403  if (hlp->childs.concurrency) {
1404  srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1405  assert(srv->requestsIndex.size() == srv->requests.size());
1406  srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1407  } else
1408  srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1409 
1410  if (!srv->flags.writing) {
1411  assert(NULL == srv->writebuf);
1412  srv->writebuf = srv->wqueue;
1413  srv->wqueue = new MemBuf;
1414  srv->flags.writing = true;
1415  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1417  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1418  }
1419 
1420  debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1421 
1422  ++ srv->stats.uses;
1423  ++ srv->stats.pending;
1424  ++ hlp->stats.requests;
1425 }
1426 
1427 static void
1429 {}
1430 
1431 static void
1433 {
1434  statefulhelper *hlp = srv->parent;
1435 
1436  if (!cbdataReferenceValid(r->request.data)) {
1437  debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1438  delete r;
1439  hlp->cancelReservation(srv->reservationId);
1440  return;
1441  }
1442 
1443  debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1444 
1445  assert(srv->reservationId);
1446  r->reply.reservationId = srv->reservationId;
1447 
1448  if (r->request.placeholder == 1) {
1449  /* a callback is needed before this request can _use_ a helper. */
1450  /* we don't care about releasing this helper. The request NEVER
1451  * gets to the helper. So we throw away the return code */
1453  r->request.callback(r->request.data, r->reply);
1454  /* throw away the placeholder */
1455  delete r;
1456  /* and push the queue. Note that the callback may have submitted a new
1457  * request to the helper which is why we test for the request */
1458 
1459  if (!srv->requests.size())
1461 
1462  return;
1463  }
1464 
1465  srv->requests.push_back(r);
1466  srv->dispatch_time = current_time;
1467  AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1469  Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1470  debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1471  hlp->id_name << " #" << srv->index << ", " <<
1472  (int) strlen(r->request.buf) << " bytes");
1473 
1474  ++ srv->stats.uses;
1475  ++ srv->stats.pending;
1476  ++ hlp->stats.requests;
1477 }
1478 
1479 static void
1481 {
1482  Helper::Xaction *r;
1483  helper_server *srv;
1484 
1485  while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1486  helperDispatch(srv, r);
1487 }
1488 
1489 static void
1491 {
1492  Helper::Xaction *r;
1494  while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1495  debugs(84, 5, "found srv-" << srv->index);
1496  hlp->reserveServer(srv);
1497  helperStatefulDispatch(srv, r);
1498  }
1499 }
1500 
1501 static void
1503 {
1504  if (!srv->flags.shutdown) {
1506  } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1507  srv->closeWritePipeSafely(srv->parent->id_name);
1508  return;
1509  }
1510 }
1511 
1512 void
1514 {
1515  assert(parent->childs.concurrency);
1516  while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1517  Helper::Xaction *r = requests.front();
1518  RequestIndex::iterator it;
1519  it = requestsIndex.find(r->request.Id);
1520  assert(it != requestsIndex.end());
1521  requestsIndex.erase(it);
1522  requests.pop_front();
1523  debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1524  void *cbdata;
1525  bool retried = false;
1526  if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1527  debugs(84, 2, "Retry request " << r->request.Id);
1528  ++r->request.retries;
1529  parent->submitRequest(r);
1530  retried = true;
1531  } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1532  if (!parent->onTimedOutResponse.isEmpty()) {
1533  if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1534  r->reply.finalize();
1535  else
1537  r->request.callback(cbdata, r->reply);
1538  } else {
1540  r->request.callback(cbdata, r->reply);
1541  }
1542  }
1543  --stats.pending;
1544  ++stats.timedout;
1545  ++parent->stats.timedout;
1546  if (!retried)
1547  delete r;
1548  }
1549 }
1550 
1551 void
1553 {
1554  debugs(26, 3, HERE << io.conn);
1555  helper_server *srv = static_cast<helper_server *>(io.data);
1556 
1558 
1559  debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1560  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1562 
1563  const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1564  const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1565 
1566  commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1567 }
1568 
virtual bool reserved()=0
whether the server is locked for exclusive use by a client
unsigned int n_active
Definition: ChildConfig.h:86
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:256
void helperStatefulSubmit(statefulhelper *hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:562
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
Definition: helper.cc:593
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:552
#define fd_table
Definition: fde.h:157
dlink_list servers
Definition: helper.h:107
#define assert(EX)
Definition: assert.h:17
void const char HLPCB * callback
Definition: stub_helper.cc:16
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:282
std::queue< Helper::Xaction * > queue
Definition: helper.h:108
virtual ~helper_stateful_server()
Definition: helper.cc:177
bool prepSubmit()
Definition: helper.cc:514
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:266
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
virtual void append(const char *c, int sz)
Definition: MemBuf.cc:216
virtual helper * getParent() const override
the helper object that created this server
Definition: helper.h:304
void fd_note(int fd, const char *s)
Definition: fd.cc:250
Definition: cbdata.cc:60
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
Definition: helper.cc:609
void * data
Definition: Request.h:41
struct HelperServerBase::@74 stats
struct timeval dispatch_time
Definition: helper.h:214
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
int i
Definition: membanger.c:49
InstanceIdDefinitions(HelperServerBase, "Hlpr")
#define xstrdup
represents a single "stateless helper" process
Definition: helper.h:243
virtual void dropQueued() override
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:171
represents a single "stateful helper" process
Definition: helper.h:293
Definition: helper.h:63
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:62
Helper::ChildConfig childs
Configuration settings for number running.
Definition: helper.h:110
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:487
#define safe_free(x)
Definition: xalloc.h:73
Holds the required data to serve a helper request.
Definition: helper.h:38
int avg_svc_time
Definition: helper.h:128
Definition: Flag.h:16
virtual ~helper_server()
Definition: helper.cc:147
wordlist * cmdline
Definition: helper.h:106
void closePipesSafely(const char *name)
Definition: helper.cc:73
void helperShutdown(helper *hlp)
Definition: helper.cc:735
struct timeval answer_time
Definition: helper.h:215
#define REDIRECT_AV_FACTOR
Definition: defines.h:85
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
int ipc_type
Definition: helper.h:111
char * key
Definition: wordlist.h:33
#define xisspace(x)
Definition: xis.h:17
void reserveServer(helper_stateful_server *srv)
reserve the given server
Definition: helper.cc:580
#define DBG_CRITICAL
Definition: Debug.h:45
char * p
Definition: membanger.c:43
static void HelperServerClosed(helper_stateful_server *srv)
close handler to handle exited server processes
Definition: helper.cc:871
int conn
the current server connection FD
Definition: Transport.cc:26
time_t last_queue_warn
Definition: helper.h:115
statefulhelper * parent
Definition: helper.h:309
Ip::Address addr
Definition: helper.h:205
struct timeval current_time
Definition: stub_time.cc:15
static void helperStatefulKickQueue(statefulhelper *hlp)
Definition: helper.cc:1490
A const & max(A const &lhs, A const &rhs)
Helper::Request request
Definition: helper.h:42
virtual bool reserved() override
whether the server is locked for exclusive use by a client
Definition: helper.h:303
size_t roffset
Definition: helper.h:212
static void helperKickQueue(helper *hlp)
Definition: helper.cc:1480
uint64_t timedout
Definition: helper.h:233
time_t squid_curtime
Definition: stub_time.cc:17
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1552
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
helper * parent
Definition: helper.h:253
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: old_api.cc:345
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1502
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
Helper::ResultCode result
The helper response &#39;result&#39; field.
Definition: Reply.h:59
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:96
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
void helperSubmit(helper *hlp, const char *buf, HLPCB *callback, void *data)
Definition: helper.cc:468
Helper::Xaction * popRequest(int requestId)
Definition: helper.cc:888
dlink_node link
Definition: helper.h:217
int shutting_down
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:27
int tvSubMsec(struct timeval, struct timeval)
Definition: stub_time.cc:20
Comm::ConnectionPointer conn
Definition: CommCalls.h:85
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:24
void const char HLPCB void * data
Definition: stub_helper.cc:16
static void Enqueue(helper *hlp, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1199
bool retryTimedOut
Whether the timed-out requests must retried.
Definition: helper.h:118
char * rbuf
Definition: helper.h:210
unsigned int n_max
Definition: ChildConfig.h:48
void packStatsInto(Packable *p, const char *label=NULL) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:672
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:124
#define cbdataReference(var)
Definition: cbdata.h:341
#define DBG_IMPORTANT
Definition: Debug.h:46
Comm::ConnectionPointer writePipe
Definition: helper.h:207
struct HelperServerBase::_helper_flags flags
const char * id_name
Definition: helper.h:109
#define FD_DESC_SZ
Definition: defines.h:46
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
void finalize()
Definition: Reply.cc:38
static IOCB helperStatefulHandleRead
Definition: helper.cc:44
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
reserved servers indexed by reservation IDs
Definition: helper.cc:570
uint64_t nextRequestId
Definition: helper.h:248
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:59
represents a single helper process abstraction
Definition: helper.h:171
Value value
instance identifier
Definition: InstanceId.h:42
time_t last_restart
Definition: helper.h:116
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1228
Helper::Xaction * replyXaction
Definition: helper.h:259
virtual ~HelperServerBase()
Definition: helper.cc:139
int commSetNonBlocking(int fd)
Definition: comm.cc:1076
bool accumulate(const char *buf, size_t len)
Definition: Reply.cc:25
void helperStatefulShutdown(statefulhelper *hlp)
Definition: helper.cc:772
Helper::Xaction * nextRequest()
Definition: helper.cc:1257
int isNull() const
Definition: MemBuf.cc:148
unsigned int n_running
Definition: ChildConfig.h:80
static helper_server * GetFirstAvailable(const helper *hlp)
Definition: helper.cc:1269
void helperStatefulOpenServers(statefulhelper *hlp)
Definition: helper.cc:327
void helperOpenServers(helper *hlp)
Definition: helper.cc:196
static IOCB helperHandleRead
Definition: helper.cc:43
bool willOverload() const
Definition: helper.cc:730
bool overloaded() const
Definition: helper.cc:481
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
char * content()
start of the added data
Definition: MemBuf.h:41
int requests
Definition: helper.h:124
void const char * buf
Definition: stub_helper.cc:16
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:153
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:36
void initStats()
Definition: helper.cc:63
Flag
Definition: Flag.h:15
void memFreeBuf(size_t size, void *)
Definition: old_api.cc:384
uint64_t uses
Definition: helper.h:229
bool SIGHDLR int STUB void int
Definition: stub_tools.cc:68
void clean()
Definition: MemBuf.cc:113
time_t getCurrentTime(void)
Get current time.
#define PRIu64
Definition: types.h:120
int needNew() const
Definition: ChildConfig.cc:59
static helper_stateful_server * StatefulGetFirstAvailable(statefulhelper *hlp)
Definition: helper.cc:1315
Comm::ConnectionPointer readPipe
Definition: helper.h:206
Helper::Reply reply
Definition: helper.h:43
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
#define xmalloc
~helper()
Definition: helper.cc:818
void closeWritePipeSafely(const char *name)
Definition: helper.cc:99
Ip::Address addr
Definition: helper.h:112
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:318
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1432
uint64_t Id
Definition: Request.h:45
time_t reservationTimeout
older stateful helper server reservations may be forgotten
Definition: ChildConfig.h:109
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:640
HLPCB * callback
Definition: Request.h:40
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
char * buf
Definition: Request.h:39
a (temporary) lock on a (stateful) helper channel
Definition: ReservationId.h:18
const size_t ReadBufSize(32 *1024)
Helpers input buffer size.
time_t timeout
Requests timeout.
Definition: helper.h:117
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:38
Definition: MemBuf.h:23
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:476
static void HelperServerClosed(helper_server *srv)
close handler to handle exited server processes
Definition: helper.cc:852
void * hIpc
Definition: helper.h:208
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:342
char eom
The char which marks the end of (response) message, normally &#39; &#39;.
Definition: helper.h:121
bool ignoreToEom
Whether to ignore current message, because it is timed-out or other reason.
Definition: helper.h:262
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:412
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:961
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1428
size_t rbuf_sz
Definition: helper.h:211
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1513
static ReservationId Next()
Definition: ReservationId.cc:4
Requests requests
requests in order of submission/expiration
Definition: helper.h:226
Definition: Debug.h:184
#define HELPER_MAX_ARGS
Definition: helper.cc:35
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
Definition: Reply.h:65
virtual void dropQueued()
dequeues and sends a Helper::Unknown answer to all queued requests
Definition: helper.cc:123
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:541
void handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
Definition: helper.cc:828
#define DBG_DATA
Definition: Debug.h:48
time_t reservationStart
when the last reservation was made
Definition: helper.h:316
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
unsigned int concurrency
Definition: ChildConfig.h:72
unsigned int queue_size
Definition: ChildConfig.h:91
struct timeval dispatch_time
Definition: Request.h:44
uint64_t replies
Definition: helper.h:230
static void helperDispatch(helper_server *srv, Helper::Xaction *r)
Definition: helper.cc:1385
int commSetConnTimeout(const Comm::ConnectionPointer &conn, int timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:552
int queue_size
Definition: helper.h:127
static void SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:455
MemBuf * wqueue
Definition: helper.h:250
#define NULL
Definition: types.h:166
MemBuf * writebuf
Definition: helper.h:251
char progname[]
int reconfiguring
static void helperReturnBuffer(helper_server *srv, helper *hlp, char *msg, size_t msgSize, char *msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:911
const InstanceId< HelperServerBase > index
Definition: helper.h:203
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:35
uint64_t pending
Definition: helper.h:231
void submitRequest(Helper::Xaction *r)
Definition: helper.cc:441
int placeholder
Definition: Request.h:43
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1359
struct helper::_stats stats
wordlist * next
Definition: wordlist.h:34
Helper::ReservationId reservationId
"confirmation ID" of the last
Definition: helper.h:315

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors