helper.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 84 Helper process maintenance */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31 
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34 
35 #define HELPER_MAX_ARGS 64
36 
38 #define MAX_RETRIES 2
39 
41 const size_t ReadBufSize(32*1024);
42 
45 static void helperServerFree(helper_server *srv);
47 static void Enqueue(helper * hlp, Helper::Xaction *);
48 static helper_server *GetFirstAvailable(const helper * hlp);
50 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
52 static void helperKickQueue(helper * hlp);
53 static void helperStatefulKickQueue(statefulhelper * hlp);
55 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
56 
61 
63 
64 void
66 {
67  stats.uses=0;
68  stats.replies=0;
69  stats.pending=0;
70  stats.releases=0;
71  stats.timedout = 0;
72 }
73 
74 void
76 {
77 #if _SQUID_WINDOWS_
78  shutdown(writePipe->fd, SD_BOTH);
79 #endif
80 
81  flags.closing = true;
82  if (readPipe->fd == writePipe->fd)
83  readPipe->fd = -1;
84  else
85  readPipe->close();
86  writePipe->close();
87 
88 #if _SQUID_WINDOWS_
89  if (hIpc) {
90  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
92  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
93  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
94  }
95  CloseHandle(hIpc);
96  }
97 #endif
98 }
99 
100 void
102 {
103 #if _SQUID_WINDOWS_
104  shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
105 #endif
106 
107  flags.closing = true;
108  if (readPipe->fd == writePipe->fd)
109  readPipe->fd = -1;
110  writePipe->close();
111 
112 #if _SQUID_WINDOWS_
113  if (hIpc) {
114  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115  getCurrentTime();
116  debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
117  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
118  }
119  CloseHandle(hIpc);
120  }
121 #endif
122 }
123 
124 void
126 {
127  char *s;
128  char *progname;
129  char *shortname;
130  char *procname;
131  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
132  char fd_note_buf[FD_DESC_SZ];
133  helper_server *srv;
134  int nargs = 0;
135  int k;
136  pid_t pid;
137  int rfd;
138  int wfd;
139  void * hIpc;
140  wordlist *w;
141 
142  if (hlp->cmdline == NULL)
143  return;
144 
145  progname = hlp->cmdline->key;
146 
147  if ((s = strrchr(progname, '/')))
148  shortname = xstrdup(s + 1);
149  else
150  shortname = xstrdup(progname);
151 
152  /* figure out how many new child are actually needed. */
153  int need_new = hlp->childs.needNew();
154 
155  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
156 
157  if (need_new < 1) {
158  debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
159  }
160 
161  procname = (char *)xmalloc(strlen(shortname) + 3);
162 
163  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
164 
165  args[nargs] = procname;
166  ++nargs;
167 
168  for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
169  args[nargs] = w->key;
170  ++nargs;
171  }
172 
173  args[nargs] = NULL;
174  ++nargs;
175 
176  assert(nargs <= HELPER_MAX_ARGS);
177 
178  for (k = 0; k < need_new; ++k) {
179  getCurrentTime();
180  rfd = wfd = -1;
181  pid = ipcCreate(hlp->ipc_type,
182  progname,
183  args,
184  shortname,
185  hlp->addr,
186  &rfd,
187  &wfd,
188  &hIpc);
189 
190  if (pid < 0) {
191  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
192  continue;
193  }
194 
195  ++ hlp->childs.n_running;
196  ++ hlp->childs.n_active;
197  srv = new helper_server;
198  srv->hIpc = hIpc;
199  srv->pid = pid;
200  srv->initStats();
201  srv->addr = hlp->addr;
202  srv->readPipe = new Comm::Connection;
203  srv->readPipe->fd = rfd;
204  srv->writePipe = new Comm::Connection;
205  srv->writePipe->fd = wfd;
206  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
207  srv->wqueue = new MemBuf;
208  srv->roffset = 0;
209  srv->nextRequestId = 0;
210  srv->replyXaction = NULL;
211  srv->ignoreToEom = false;
212  srv->parent = cbdataReference(hlp);
213  dlinkAddTail(srv, &srv->link, &hlp->servers);
214 
215  if (rfd == wfd) {
216  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
217  fd_note(rfd, fd_note_buf);
218  } else {
219  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
220  fd_note(rfd, fd_note_buf);
221  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
222  fd_note(wfd, fd_note_buf);
223  }
224 
225  commSetNonBlocking(rfd);
226 
227  if (wfd != rfd)
228  commSetNonBlocking(wfd);
229 
230  AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
231  comm_add_close_handler(rfd, closeCall);
232 
233  if (hlp->timeout && hlp->childs.concurrency) {
234  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
236  commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
237  }
238 
239  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
241  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
242  }
243 
245  safe_free(shortname);
246  safe_free(procname);
247  helperKickQueue(hlp);
248 }
249 
255 void
257 {
258  char *shortname;
259  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
260  char fd_note_buf[FD_DESC_SZ];
261  int nargs = 0;
262 
263  if (hlp->cmdline == NULL)
264  return;
265 
266  if (hlp->childs.concurrency)
267  debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
268 
269  char *progname = hlp->cmdline->key;
270 
271  char *s;
272  if ((s = strrchr(progname, '/')))
273  shortname = xstrdup(s + 1);
274  else
275  shortname = xstrdup(progname);
276 
277  /* figure out haw mant new helpers are needed. */
278  int need_new = hlp->childs.needNew();
279 
280  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
281 
282  if (need_new < 1) {
283  debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
284  }
285 
286  char *procname = (char *)xmalloc(strlen(shortname) + 3);
287 
288  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
289 
290  args[nargs] = procname;
291  ++nargs;
292 
293  for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
294  args[nargs] = w->key;
295  ++nargs;
296  }
297 
298  args[nargs] = NULL;
299  ++nargs;
300 
301  assert(nargs <= HELPER_MAX_ARGS);
302 
303  for (int k = 0; k < need_new; ++k) {
304  getCurrentTime();
305  int rfd = -1;
306  int wfd = -1;
307  void * hIpc;
308  pid_t pid = ipcCreate(hlp->ipc_type,
309  progname,
310  args,
311  shortname,
312  hlp->addr,
313  &rfd,
314  &wfd,
315  &hIpc);
316 
317  if (pid < 0) {
318  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
319  continue;
320  }
321 
322  ++ hlp->childs.n_running;
323  ++ hlp->childs.n_active;
325  srv->hIpc = hIpc;
326  srv->pid = pid;
327  srv->flags.reserved = false;
328  srv->initStats();
329  srv->addr = hlp->addr;
330  srv->readPipe = new Comm::Connection;
331  srv->readPipe->fd = rfd;
332  srv->writePipe = new Comm::Connection;
333  srv->writePipe->fd = wfd;
334  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
335  srv->roffset = 0;
336  srv->parent = cbdataReference(hlp);
337 
338  dlinkAddTail(srv, &srv->link, &hlp->servers);
339 
340  if (rfd == wfd) {
341  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
342  fd_note(rfd, fd_note_buf);
343  } else {
344  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
345  fd_note(rfd, fd_note_buf);
346  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
347  fd_note(wfd, fd_note_buf);
348  }
349 
350  commSetNonBlocking(rfd);
351 
352  if (wfd != rfd)
353  commSetNonBlocking(wfd);
354 
355  AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
356  comm_add_close_handler(rfd, closeCall);
357 
358  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
360  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
361  }
362 
364  safe_free(shortname);
365  safe_free(procname);
367 }
368 
369 void
371 {
372  helper_server *srv;
373 
374  if ((srv = GetFirstAvailable(this)))
375  helperDispatch(srv, r);
376  else
377  Enqueue(this, r);
378 
379  syncQueueStats();
380 }
381 
383 static void
385 {
386  auto result = Helper::Error;
387  if (!hlp) {
388  debugs(84, 3, "no helper");
389  result = Helper::Unknown;
390  }
391  // else pretend the helper has responded with ERR
392 
393  callback(data, Helper::Reply(result));
394 }
395 
396 void
397 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
398 {
399  if (!hlp || !hlp->trySubmit(buf, callback, data))
400  SubmissionFailure(hlp, callback, data);
401 }
402 
404 bool
406  return stats.queue_size >= static_cast<int>(childs.queue_size);
407 }
408 
409 bool
411  return stats.queue_size > static_cast<int>(childs.queue_size);
412 }
413 
415 void
417 {
418  if (overloaded()) {
419  if (overloadStart) {
420  debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
421  } else {
423  debugs(84, 3, id_name << " became overloaded");
424  }
425  } else {
426  if (overloadStart) {
427  debugs(84, 5, id_name << " is no longer overloaded");
428  if (droppedRequests) {
429  debugs(84, DBG_IMPORTANT, "helper " << id_name <<
430  " is no longer overloaded after dropping " << droppedRequests <<
431  " requests in " << (squid_curtime - overloadStart) << " seconds");
432  droppedRequests = 0;
433  }
434  overloadStart = 0;
435  }
436  }
437 }
438 
442 bool
444 {
445  // re-sync for the configuration may have changed since the last submission
446  syncQueueStats();
447 
448  // Nothing special to do if the new request does not overload (i.e., the
449  // queue is not even full yet) or only _starts_ overloading this helper
450  // (i.e., the queue is currently at its limit).
451  if (!overloaded())
452  return true;
453 
454  if (squid_curtime - overloadStart <= 180)
455  return true; // also OK: overload has not persisted long enough to panic
456 
458  fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
459 
460  if (!droppedRequests) {
461  debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
462  id_name << " helper configured with on-persistent-overload=err");
463  }
464  ++droppedRequests;
465  debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
466  return false;
467 }
468 
469 bool
470 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
471 {
472  if (!prepSubmit())
473  return false; // request was dropped
474 
475  submit(buf, callback, data); // will send or queue
476  return true; // request submitted or queued
477 }
478 
480 void
481 helper::submit(const char *buf, HLPCB * callback, void *data)
482 {
483  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
484  submitRequest(r);
485  debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
486 }
487 
489 void
491 {
492  if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver))
493  SubmissionFailure(hlp, callback, data);
494 }
495 
497 bool
499 {
500  if (!prepSubmit())
501  return false; // request was dropped
502 
503  submit(buf, callback, data, lastserver); // will send or queue
504  return true; // request submitted or queued
505 }
506 
507 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
508 {
509  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
510 
511  if ((buf != NULL) && lastserver) {
512  debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
513  assert(lastserver->flags.reserved);
514  assert(!lastserver->requests.size());
515 
516  debugs(84, 5, "StatefulSubmit dispatching");
517  helperStatefulDispatch(lastserver, r);
518  } else {
520  if ((srv = StatefulGetFirstAvailable(this))) {
521  helperStatefulDispatch(srv, r);
522  } else
523  StatefulEnqueue(this, r);
524  }
525 
526  debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
527  "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
528 
529  syncQueueStats();
530 }
531 
538 void
540 {
541  debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
542  if (!srv->flags.reserved)
543  return;
544 
545  ++ srv->stats.releases;
546 
547  srv->flags.reserved = false;
548 
550 }
551 
552 void
553 helper::packStatsInto(Packable *p, const char *label) const
554 {
555  if (label)
556  p->appendf("%s:\n", label);
557 
558  p->appendf(" program: %s\n", cmdline->key);
559  p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
560  p->appendf(" requests sent: %d\n", stats.requests);
561  p->appendf(" replies received: %d\n", stats.replies);
562  p->appendf(" requests timedout: %d\n", stats.timedout);
563  p->appendf(" queue length: %d\n", stats.queue_size);
564  p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
565  p->append("\n",1);
566  p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
567  "ID #",
568  "FD",
569  "PID",
570  "# Requests",
571  "# Replies",
572  "# Timed-out",
573  "Flags",
574  "Time",
575  "Offset",
576  "Request");
577 
578  for (dlink_node *link = servers.head; link; link = link->next) {
579  HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
580  assert(srv);
581  Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
582  double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
583  p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
584  srv->index.value,
585  srv->readPipe->fd,
586  srv->pid,
587  srv->stats.uses,
588  srv->stats.replies,
589  srv->stats.timedout,
590  srv->stats.pending ? 'B' : ' ',
591  srv->flags.writing ? 'W' : ' ',
592  srv->flags.closing ? 'C' : ' ',
593  srv->flags.reserved ? 'R' : ' ',
594  srv->flags.shutdown ? 'S' : ' ',
595  xaction && xaction->request.placeholder ? 'P' : ' ',
596  tt < 0.0 ? 0.0 : tt,
597  (int) srv->roffset,
598  xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
599  }
600 
601  p->append("\nFlags key:\n"
602  " B\tBUSY\n"
603  " W\tWRITING\n"
604  " C\tCLOSING\n"
605  " R\tRESERVED\n"
606  " S\tSHUTDOWN PENDING\n"
607  " P\tPLACEHOLDER\n", 101);
608 }
609 
610 bool
612  return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
613 }
614 
615 void
617 {
618  dlink_node *link = hlp->servers.head;
619 
620  while (link) {
621  helper_server *srv;
622  srv = (helper_server *)link->data;
623  link = link->next;
624 
625  if (srv->flags.shutdown) {
626  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
627  continue;
628  }
629 
630  assert(hlp->childs.n_active > 0);
631  -- hlp->childs.n_active;
632  srv->flags.shutdown = true; /* request it to shut itself down */
633 
634  if (srv->flags.closing) {
635  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
636  continue;
637  }
638 
639  if (srv->stats.pending) {
640  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
641  continue;
642  }
643 
644  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
645  /* the rest of the details is dealt with in the helperServerFree
646  * close handler
647  */
648  srv->closePipesSafely(hlp->id_name);
649  }
650 }
651 
652 void
654 {
655  dlink_node *link = hlp->servers.head;
657 
658  while (link) {
659  srv = (helper_stateful_server *)link->data;
660  link = link->next;
661 
662  if (srv->flags.shutdown) {
663  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
664  continue;
665  }
666 
667  assert(hlp->childs.n_active > 0);
668  -- hlp->childs.n_active;
669  srv->flags.shutdown = true; /* request it to shut itself down */
670 
671  if (srv->stats.pending) {
672  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
673  continue;
674  }
675 
676  if (srv->flags.closing) {
677  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
678  continue;
679  }
680 
681  if (srv->flags.reserved) {
682  if (shutting_down) {
683  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
684  } else {
685  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
686  continue;
687  }
688  }
689 
690  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
691 
692  /* the rest of the details is dealt with in the helperStatefulServerFree
693  * close handler
694  */
695  srv->closePipesSafely(hlp->id_name);
696  }
697 }
698 
700 {
701  /* note, don't free id_name, it probably points to static memory */
702 
703  // TODO: if the queue is not empty it will leak Helper::Request's
704  if (!queue.empty())
705  debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
706 }
707 
708 /* ====================================================================== */
709 /* LOCAL FUNCTIONS */
710 /* ====================================================================== */
711 
712 static void
714 {
715  helper *hlp = srv->parent;
716  int concurrency = hlp->childs.concurrency;
717 
718  if (!concurrency)
719  concurrency = 1;
720 
721  if (srv->rbuf) {
722  memFreeBuf(srv->rbuf_sz, srv->rbuf);
723  srv->rbuf = NULL;
724  }
725 
726  srv->wqueue->clean();
727  delete srv->wqueue;
728 
729  if (srv->writebuf) {
730  srv->writebuf->clean();
731  delete srv->writebuf;
732  srv->writebuf = NULL;
733  }
734 
735  if (Comm::IsConnOpen(srv->writePipe))
736  srv->closeWritePipeSafely(hlp->id_name);
737 
738  dlinkDelete(&srv->link, &hlp->servers);
739 
740  assert(hlp->childs.n_running > 0);
741  -- hlp->childs.n_running;
742 
743  if (!srv->flags.shutdown) {
744  assert(hlp->childs.n_active > 0);
745  -- hlp->childs.n_active;
746  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
747 
748  if (hlp->childs.needNew() > 0) {
749  debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
750 
751  if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
752  if (srv->stats.replies < 1)
753  fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
754  else
755  debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
756  }
757 
758  debugs(80, DBG_IMPORTANT, "Starting new helpers");
759  helperOpenServers(hlp);
760  }
761  }
762 
763  while (!srv->requests.empty()) {
764  // XXX: re-schedule these on another helper?
765  Helper::Xaction *r = srv->requests.front();
766  srv->requests.pop_front();
767  void *cbdata;
768 
769  if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
771  r->request.callback(cbdata, r->reply);
772  }
773 
774  delete r;
775  }
776  srv->requestsIndex.clear();
777 
779  delete srv;
780 }
781 
782 static void
784 {
785  statefulhelper *hlp = srv->parent;
786 
787  if (srv->rbuf) {
788  memFreeBuf(srv->rbuf_sz, srv->rbuf);
789  srv->rbuf = NULL;
790  }
791 
792 #if 0
793  srv->wqueue->clean();
794 
795  delete srv->wqueue;
796 
797 #endif
798 
799  /* TODO: walk the local queue of requests and carry them all out */
800  if (Comm::IsConnOpen(srv->writePipe))
801  srv->closeWritePipeSafely(hlp->id_name);
802 
803  dlinkDelete(&srv->link, &hlp->servers);
804 
805  assert(hlp->childs.n_running > 0);
806  -- hlp->childs.n_running;
807 
808  if (!srv->flags.shutdown) {
809  assert( hlp->childs.n_active > 0);
810  -- hlp->childs.n_active;
811  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
812 
813  if (hlp->childs.needNew() > 0) {
814  debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
815 
816  if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
817  if (srv->stats.replies < 1)
818  fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
819  else
820  debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
821  }
822 
823  debugs(80, DBG_IMPORTANT, "Starting new helpers");
825  }
826  }
827 
828  while (!srv->requests.empty()) {
829  // XXX: re-schedule these on another helper?
830  Helper::Xaction *r = srv->requests.front();
831  srv->requests.pop_front();
832  void *cbdata;
833 
834  if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
836  r->request.callback(cbdata, r->reply);
837  }
838 
839  delete r;
840  }
841 
843 
844  delete srv;
845 }
846 
848 helper_server::popRequest(int request_number)
849 {
850  Helper::Xaction *r = nullptr;
851  helper_server::RequestIndex::iterator it;
852  if (parent->childs.concurrency) {
853  // If concurency supported retrieve request from ID
854  it = requestsIndex.find(request_number);
855  if (it != requestsIndex.end()) {
856  r = *(it->second);
857  requests.erase(it->second);
858  requestsIndex.erase(it);
859  }
860  } else if(!requests.empty()) {
861  // Else get the first request from queue, if any
862  r = requests.front();
863  requests.pop_front();
864  }
865 
866  return r;
867 }
868 
870 static void
871 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
872 {
873  if (Helper::Xaction *r = srv->replyXaction) {
874  const bool hasSpace = r->reply.accumulate(msg, msgSize);
875  if (!hasSpace) {
876  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
877  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
878  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
879  srv->closePipesSafely(hlp->id_name);
880  return;
881  }
882 
883  if (!msgEnd)
884  return; // We are waiting for more data.
885 
886  bool retry = false;
887  if (cbdataReferenceValid(r->request.data)) {
888  r->reply.finalize();
889  if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
890  debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
891  retry = true;
892  } else {
893  HLPCB *callback = r->request.callback;
894  r->request.callback = nullptr;
895  void *cbdata = nullptr;
896  if (cbdataReferenceValidDone(r->request.data, &cbdata))
897  callback(cbdata, r->reply);
898  }
899  }
900 
901  -- srv->stats.pending;
902  ++ srv->stats.replies;
903 
904  ++ hlp->stats.replies;
905 
906  srv->answer_time = current_time;
907 
908  srv->dispatch_time = r->request.dispatch_time;
909 
910  hlp->stats.avg_svc_time =
912  tvSubMsec(r->request.dispatch_time, current_time),
914 
915  // release or re-submit parsedRequestXaction object
916  srv->replyXaction = nullptr;
917  if (retry) {
918  ++r->request.retries;
919  hlp->submitRequest(r);
920  } else
921  delete r;
922  }
923 
924  if (hlp->timeout && hlp->childs.concurrency)
926 
927  if (!srv->flags.shutdown) {
928  helperKickQueue(hlp);
929  } else if (!srv->flags.closing && !srv->stats.pending) {
930  srv->flags.closing=true;
931  srv->writePipe->close();
932  }
933 }
934 
935 static void
936 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
937 {
938  helper_server *srv = (helper_server *)data;
939  helper *hlp = srv->parent;
941 
942  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
943 
944  if (flag == Comm::ERR_CLOSING) {
945  return;
946  }
947 
948  assert(conn->fd == srv->readPipe->fd);
949 
950  debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
951 
952  if (flag != Comm::OK || len == 0) {
953  srv->closePipesSafely(hlp->id_name);
954  return;
955  }
956 
957  srv->roffset += len;
958  srv->rbuf[srv->roffset] = '\0';
959  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
960 
961  if (!srv->stats.pending && !srv->stats.timedout) {
962  /* someone spoke without being spoken to */
963  debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
964  hlp->id_name << " #" << srv->index << ", " << (int)len <<
965  " bytes '" << srv->rbuf << "'");
966 
967  srv->roffset = 0;
968  srv->rbuf[0] = '\0';
969  }
970 
971  bool needsMore = false;
972  char *msg = srv->rbuf;
973  while (*msg && !needsMore) {
974  int skip = 0;
975  char *eom = strchr(msg, hlp->eom);
976  if (eom) {
977  skip = 1;
978  debugs(84, 3, "helperHandleRead: end of reply found");
979  if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
980  *eom = '\0';
981  // rewind to the \r octet which is the real terminal now
982  // and remember that we have to skip forward 2 places now.
983  skip = 2;
984  --eom;
985  }
986  *eom = '\0';
987  }
988 
989  if (!srv->ignoreToEom && !srv->replyXaction) {
990  int i = 0;
991  if (hlp->childs.concurrency) {
992  char *e = NULL;
993  i = strtol(msg, &e, 10);
994  // Do we need to check for e == msg? Means wrong response from helper.
995  // Will be droped as "unexpected reply on channel 0"
996  needsMore = !(xisspace(*e) || (eom && e == eom));
997  if (!needsMore) {
998  msg = e;
999  while (*msg && xisspace(*msg))
1000  ++msg;
1001  } // else not enough data to compute request number
1002  }
1003  if (!(srv->replyXaction = srv->popRequest(i))) {
1004  if (srv->stats.timedout) {
1005  debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1006  } else {
1007  debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
1008  i << " from " << hlp->id_name << " #" << srv->index <<
1009  " '" << srv->rbuf << "'");
1010  }
1011  srv->ignoreToEom = true;
1012  }
1013  } // else we need to just append reply data to the current Xaction
1014 
1015  if (!needsMore) {
1016  size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1017  assert(msgSize <= srv->rbuf_sz);
1018  helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1019  msg += msgSize + skip;
1020  assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1021 
1022  // The next message should not ignored.
1023  if (eom && srv->ignoreToEom)
1024  srv->ignoreToEom = false;
1025  } else
1026  assert(skip == 0 && eom == NULL);
1027  }
1028 
1029  if (needsMore) {
1030  size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1031  assert(msgSize <= srv->rbuf_sz);
1032  memmove(srv->rbuf, msg, msgSize);
1033  srv->roffset = msgSize;
1034  srv->rbuf[srv->roffset] = '\0';
1035  } else {
1036  // All of the responses parsed and msg points at the end of read data
1037  assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1038  srv->roffset = 0;
1039  }
1040 
1041  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1042  int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1043  assert(spaceSize >= 0);
1044 
1045  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1047  comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1048  }
1049 }
1050 
1051 static void
1052 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1053 {
1054  char *t = NULL;
1056  statefulhelper *hlp = srv->parent;
1058 
1059  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1060 
1061  if (flag == Comm::ERR_CLOSING) {
1062  return;
1063  }
1064 
1065  assert(conn->fd == srv->readPipe->fd);
1066 
1067  debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1068  hlp->id_name << " #" << srv->index);
1069 
1070  if (flag != Comm::OK || len == 0) {
1071  srv->closePipesSafely(hlp->id_name);
1072  return;
1073  }
1074 
1075  srv->roffset += len;
1076  srv->rbuf[srv->roffset] = '\0';
1077  Helper::Xaction *r = srv->requests.front();
1078  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1079 
1080  if (r == NULL) {
1081  /* someone spoke without being spoken to */
1082  debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1083  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1084  " bytes '" << srv->rbuf << "'");
1085 
1086  srv->roffset = 0;
1087  }
1088 
1089  if ((t = strchr(srv->rbuf, hlp->eom))) {
1090  debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1091 
1092  if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1093  *t = '\0';
1094  // rewind to the \r octet which is the real terminal now
1095  --t;
1096  }
1097 
1098  *t = '\0';
1099  }
1100 
1101  if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1102  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1103  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1104  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1105  srv->closePipesSafely(hlp->id_name);
1106  return;
1107  }
1114  srv->roffset = 0;
1115 
1116  if (t) {
1117  /* end of reply found */
1118  srv->requests.pop_front(); // we already have it in 'r'
1119  int called = 1;
1120 
1121  if (r && cbdataReferenceValid(r->request.data)) {
1122  r->reply.finalize();
1123  r->reply.whichServer = srv;
1124  r->request.callback(r->request.data, r->reply);
1125  } else {
1126  debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1127  called = 0;
1128  }
1129 
1130  delete r;
1131 
1132  -- srv->stats.pending;
1133  ++ srv->stats.replies;
1134 
1135  ++ hlp->stats.replies;
1136  srv->answer_time = current_time;
1137  hlp->stats.avg_svc_time =
1141 
1142  if (called)
1144  else
1146  }
1147 
1148  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1149  int spaceSize = srv->rbuf_sz - 1;
1150 
1151  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1153  comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1154  }
1155 }
1156 
1158 static void
1160 {
1161  hlp->queue.push(r);
1162  ++ hlp->stats.queue_size;
1163 
1164  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1165  if (hlp->childs.needNew() > 0) {
1166  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1167  helperOpenServers(hlp);
1168  return;
1169  }
1170 
1171  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1172  return;
1173 
1174  if (squid_curtime - hlp->last_queue_warn < 600)
1175  return;
1176 
1178  return;
1179 
1181 
1182  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1183  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1184  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1185 }
1186 
1187 static void
1189 {
1190  hlp->queue.push(r);
1191  ++ hlp->stats.queue_size;
1192 
1193  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1194  if (hlp->childs.needNew() > 0) {
1195  debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1197  return;
1198  }
1199 
1200  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1201  return;
1202 
1203  if (squid_curtime - hlp->last_queue_warn < 600)
1204  return;
1205 
1207  return;
1208 
1210 
1211  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1212  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1213  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1214 }
1215 
1218 {
1219  if (queue.empty())
1220  return nullptr;
1221 
1222  auto *r = queue.front();
1223  queue.pop();
1224  --stats.queue_size;
1225  return r;
1226 }
1227 
1228 static helper_server *
1230 {
1231  dlink_node *n;
1232  helper_server *srv;
1233  helper_server *selected = NULL;
1234  debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1235 
1236  if (hlp->childs.n_running == 0)
1237  return NULL;
1238 
1239  /* Find "least" loaded helper (approx) */
1240  for (n = hlp->servers.head; n != NULL; n = n->next) {
1241  srv = (helper_server *)n->data;
1242 
1243  if (selected && selected->stats.pending <= srv->stats.pending)
1244  continue;
1245 
1246  if (srv->flags.shutdown)
1247  continue;
1248 
1249  if (!srv->stats.pending)
1250  return srv;
1251 
1252  if (selected) {
1253  selected = srv;
1254  break;
1255  }
1256 
1257  selected = srv;
1258  }
1259 
1260  if (!selected) {
1261  debugs(84, 5, "GetFirstAvailable: None available.");
1262  return NULL;
1263  }
1264 
1265  if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1266  debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1267  return NULL;
1268  }
1269 
1270  debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1271  return selected;
1272 }
1273 
1274 static helper_stateful_server *
1276 {
1277  dlink_node *n;
1279  debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1280 
1281  if (hlp->childs.n_running == 0)
1282  return NULL;
1283 
1284  for (n = hlp->servers.head; n != NULL; n = n->next) {
1285  srv = (helper_stateful_server *)n->data;
1286 
1287  if (srv->stats.pending)
1288  continue;
1289 
1290  if (srv->flags.reserved)
1291  continue;
1292 
1293  if (srv->flags.shutdown)
1294  continue;
1295 
1296  debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1297  return srv;
1298  }
1299 
1300  debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1301  return NULL;
1302 }
1303 
1304 static void
1305 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1306 {
1307  helper_server *srv = (helper_server *)data;
1308 
1309  srv->writebuf->clean();
1310  delete srv->writebuf;
1311  srv->writebuf = NULL;
1312  srv->flags.writing = false;
1313 
1314  if (flag != Comm::OK) {
1315  /* Helper server has crashed */
1316  debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1317  return;
1318  }
1319 
1320  if (!srv->wqueue->isNull()) {
1321  srv->writebuf = srv->wqueue;
1322  srv->wqueue = new MemBuf;
1323  srv->flags.writing = true;
1324  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1326  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1327  }
1328 }
1329 
1330 static void
1332 {
1333  helper *hlp = srv->parent;
1334  const uint64_t reqId = ++srv->nextRequestId;
1335 
1336  if (!cbdataReferenceValid(r->request.data)) {
1337  debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1338  delete r;
1339  return;
1340  }
1341 
1342  r->request.Id = reqId;
1343  helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1345 
1346  if (srv->wqueue->isNull())
1347  srv->wqueue->init();
1348 
1349  if (hlp->childs.concurrency) {
1350  srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1351  assert(srv->requestsIndex.size() == srv->requests.size());
1352  srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1353  } else
1354  srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1355 
1356  if (!srv->flags.writing) {
1357  assert(NULL == srv->writebuf);
1358  srv->writebuf = srv->wqueue;
1359  srv->wqueue = new MemBuf;
1360  srv->flags.writing = true;
1361  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1363  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1364  }
1365 
1366  debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1367 
1368  ++ srv->stats.uses;
1369  ++ srv->stats.pending;
1370  ++ hlp->stats.requests;
1371 }
1372 
1373 static void
1375 {}
1376 
1377 static void
1379 {
1380  statefulhelper *hlp = srv->parent;
1381 
1382  if (!cbdataReferenceValid(r->request.data)) {
1383  debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1384  delete r;
1386  return;
1387  }
1388 
1389  debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1390 
1391  if (r->request.placeholder == 1) {
1392  /* a callback is needed before this request can _use_ a helper. */
1393  /* we don't care about releasing this helper. The request NEVER
1394  * gets to the helper. So we throw away the return code */
1396  r->reply.whichServer = srv;
1397  r->request.callback(r->request.data, r->reply);
1398  /* throw away the placeholder */
1399  delete r;
1400  /* and push the queue. Note that the callback may have submitted a new
1401  * request to the helper which is why we test for the request */
1402 
1403  if (!srv->requests.size())
1405 
1406  return;
1407  }
1408 
1409  srv->flags.reserved = true;
1410  srv->requests.push_back(r);
1411  srv->dispatch_time = current_time;
1412  AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1414  Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1415  debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1416  hlp->id_name << " #" << srv->index << ", " <<
1417  (int) strlen(r->request.buf) << " bytes");
1418 
1419  ++ srv->stats.uses;
1420  ++ srv->stats.pending;
1421  ++ hlp->stats.requests;
1422 }
1423 
1424 static void
1426 {
1427  Helper::Xaction *r;
1428  helper_server *srv;
1429 
1430  while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1431  helperDispatch(srv, r);
1432 }
1433 
1434 static void
1436 {
1437  Helper::Xaction *r;
1439 
1440  while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1441  helperStatefulDispatch(srv, r);
1442 }
1443 
1444 static void
1446 {
1447  if (!srv->flags.shutdown) {
1449  } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1450  srv->closeWritePipeSafely(srv->parent->id_name);
1451  return;
1452  }
1453 }
1454 
1455 void
1457 {
1459  while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1460  Helper::Xaction *r = requests.front();
1461  RequestIndex::iterator it;
1462  it = requestsIndex.find(r->request.Id);
1463  assert(it != requestsIndex.end());
1464  requestsIndex.erase(it);
1465  requests.pop_front();
1466  debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1467  void *cbdata;
1468  bool retried = false;
1469  if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1470  debugs(84, 2, "Retry request " << r->request.Id);
1471  ++r->request.retries;
1472  parent->submitRequest(r);
1473  retried = true;
1474  } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1475  if (!parent->onTimedOutResponse.isEmpty()) {
1477  r->reply.finalize();
1478  else
1480  r->request.callback(cbdata, r->reply);
1481  } else {
1483  r->request.callback(cbdata, r->reply);
1484  }
1485  }
1486  --stats.pending;
1487  ++stats.timedout;
1488  ++parent->stats.timedout;
1489  if (!retried)
1490  delete r;
1491  }
1492 }
1493 
1494 void
1496 {
1497  debugs(26, 3, HERE << io.conn);
1498  helper_server *srv = static_cast<helper_server *>(io.data);
1499 
1500  if (!cbdataReferenceValid(srv))
1501  return;
1502 
1504 
1505  debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1506  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1508 
1509  const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1510  const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1511 
1512  commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1513 }
1514 
unsigned int n_active
Definition: ChildConfig.h:86
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:256
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:481
#define fd_table
Definition: fde.h:157
void helperStatefulSubmit(statefulhelper *hlp, const char *buf, HLPCB *callback, void *data, helper_stateful_server *lastserver)
lastserver = "server last used as part of a reserved request sequence"
Definition: helper.cc:490
dlink_list servers
Definition: helper.h:99
#define assert(EX)
Definition: assert.h:17
void const char HLPCB * callback
Definition: stub_helper.cc:16
std::queue< Helper::Xaction * > queue
Definition: helper.h:100
bool prepSubmit()
Definition: helper.cc:443
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:232
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
virtual void append(const char *c, int sz)
Definition: MemBuf.cc:225
void fd_note(int fd, const char *s)
Definition: fd.cc:251
void packStatsInto(Packable *p, const char *label=NULL) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:553
Definition: cbdata.cc:60
void * data
Definition: Request.h:41
struct timeval dispatch_time
Definition: helper.h:181
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
int i
Definition: membanger.c:49
#define xstrdup
static void helperStatefulServerFree(helper_stateful_server *srv)
Definition: helper.cc:783
Definition: helper.h:60
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:62
Helper::ChildConfig childs
Configuration settings for number running.
Definition: helper.h:102
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:416
#define safe_free(x)
Definition: xalloc.h:73
Holds the required data to serve a helper request.
Definition: helper.h:36
int avg_svc_time
Definition: helper.h:120
Definition: Flag.h:16
wordlist * cmdline
Definition: helper.h:98
void closePipesSafely(const char *name)
Definition: helper.cc:75
void helperShutdown(helper *hlp)
Definition: helper.cc:616
static helper_stateful_server * StatefulGetFirstAvailable(const statefulhelper *hlp)
Definition: helper.cc:1275
struct timeval answer_time
Definition: helper.h:182
#define REDIRECT_AV_FACTOR
Definition: defines.h:85
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
int ipc_type
Definition: helper.h:103
char * key
Definition: wordlist.h:33
#define xisspace(x)
Definition: xis.h:17
bool isEmpty() const
Definition: SBuf.h:422
#define DBG_CRITICAL
Definition: Debug.h:44
char * p
Definition: membanger.c:43
int conn
the current server connection FD
Definition: Transport.cc:26
time_t last_queue_warn
Definition: helper.h:107
static void * hIpc
Definition: IcmpSquid.cc:34
statefulhelper * parent
Definition: helper.h:250
Ip::Address addr
Definition: helper.h:172
struct timeval current_time
Definition: stub_time.cc:15
static void helperStatefulKickQueue(statefulhelper *hlp)
Definition: helper.cc:1435
A const & max(A const &lhs, A const &rhs)
Helper::Request request
Definition: helper.h:40
size_t roffset
Definition: helper.h:179
static void helperKickQueue(helper *hlp)
Definition: helper.cc:1425
uint64_t timedout
Definition: helper.h:201
time_t squid_curtime
Definition: stub_time.cc:17
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1495
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
helper * parent
Definition: helper.h:219
InstanceIdDefinitions(HelperServerBase,"Hlpr")
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: old_api.cc:320
bool overloaded() const
Definition: helper.cc:410
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1445
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
Helper::ResultCode result
The helper response 'result' field.
Definition: Reply.h:58
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:105
void fatalf(const char *fmt,...)
Definition: fatal.cc:79
void helperSubmit(helper *hlp, const char *buf, HLPCB *callback, void *data)
Definition: helper.cc:397
Helper::Xaction * popRequest(int requestId)
Definition: helper.cc:848
dlink_node link
Definition: helper.h:184
int shutting_down
Definition: testAddress.cc:36
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:27
int tvSubMsec(struct timeval, struct timeval)
Definition: stub_time.cc:20
Comm::ConnectionPointer conn
Definition: CommCalls.h:85
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:24
void const char HLPCB void * data
Definition: stub_helper.cc:16
size_type length() const
Returns the number of bytes stored in SBuf.
Definition: SBuf.h:405
static void Enqueue(helper *hlp, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1159
bool retryTimedOut
Whether the timed-out requests must retried.
Definition: helper.h:110
char * rbuf
Definition: helper.h:177
unsigned int n_max
Definition: ChildConfig.h:48
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
#define cbdataReference(var)
Definition: cbdata.h:341
time_t overloadStart
when the helper became overloaded (zero if it is not)
Definition: helper.h:106
#define DBG_IMPORTANT
Definition: Debug.h:45
Comm::ConnectionPointer writePipe
Definition: helper.h:174
struct HelperServerBase::_helper_flags flags
const char * id_name
Definition: helper.h:101
#define FD_DESC_SZ
Definition: defines.h:46
void finalize()
Definition: Reply.cc:39
int timedout
Definition: helper.h:118
static IOCB helperStatefulHandleRead
Definition: helper.cc:44
uint64_t nextRequestId
Definition: helper.h:211
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:57
Value value
instance identifier
Definition: InstanceId.h:42
unsigned int n_startup
Definition: ChildConfig.h:57
time_t last_restart
Definition: helper.h:108
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1188
CbcPointer< helper_stateful_server > whichServer
for stateful replies the responding helper 'server' needs to be preserved across callbacks ...
Definition: Reply.h:64
Helper::Xaction * replyXaction
Definition: helper.h:225
int commSetNonBlocking(int fd)
Definition: comm.cc:1076
bool accumulate(const char *buf, size_t len)
Definition: Reply.cc:26
void helperStatefulShutdown(statefulhelper *hlp)
Definition: helper.cc:653
Helper::Xaction * nextRequest()
Definition: helper.cc:1217
unsigned int n_running
Definition: ChildConfig.h:80
static helper_server * GetFirstAvailable(const helper *hlp)
Definition: helper.cc:1229
void helperStatefulOpenServers(statefulhelper *hlp)
Definition: helper.cc:256
void helperOpenServers(helper *hlp)
Definition: helper.cc:125
static IOCB helperHandleRead
Definition: helper.cc:43
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
char * content()
start of the added data
Definition: MemBuf.h:41
SBuf onTimedOutResponse
The response to use when helper response timedout.
Definition: helper.h:112
int requests
Definition: helper.h:116
void const char * buf
Definition: stub_helper.cc:16
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:147
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:36
unsigned int droppedRequests
requests not sent during helper overload
Definition: helper.h:105
void initStats()
Definition: helper.cc:65
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
Flag
Definition: Flag.h:15
void memFreeBuf(size_t size, void *)
Definition: old_api.cc:359
uint64_t uses
Definition: helper.h:197
static pid_t pid
Definition: IcmpSquid.cc:35
void clean()
Definition: MemBuf.cc:122
void helperStatefulReleaseServer(helper_stateful_server *srv)
Definition: helper.cc:539
SubmissionErrorHandlingAction onPersistentOverload
how to handle a new request for helper that was overloaded for too long
Definition: ChildConfig.h:99
time_t getCurrentTime(void)
Get current time.
#define PRIu64
Definition: types.h:120
void submit(const char *buf, HLPCB *callback, void *data, helper_stateful_server *lastserver)
Definition: helper.cc:507
int isNull() const
Definition: MemBuf.cc:157
Comm::ConnectionPointer readPipe
Definition: helper.h:173
Helper::Reply reply
Definition: helper.h:41
#define xmalloc
~helper()
Definition: helper.cc:699
void closeWritePipeSafely(const char *name)
Definition: helper.cc:101
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:405
Ip::Address addr
Definition: helper.h:104
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:318
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1378
uint64_t Id
Definition: Request.h:45
HLPCB * callback
Definition: Request.h:40
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
char * buf
Definition: Request.h:39
uint64_t releases
Definition: helper.h:200
const size_t ReadBufSize(32 *1024)
Helpers input buffer size.
time_t timeout
Requests timeout.
Definition: helper.h:109
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:38
Definition: MemBuf.h:23
bool trySubmit(const char *buf, HLPCB *callback, void *data, helper_stateful_server *lastserver)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:498
void * hIpc
Definition: helper.h:175
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:342
char eom
The char which marks the end of (response) message, normally ' '.
Definition: helper.h:113
bool ignoreToEom
Whether to ignore current message, because it is timed-out or other reason.
Definition: helper.h:228
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:412
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:961
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1374
size_t rbuf_sz
Definition: helper.h:178
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1456
Requests requests
requests in order of submission/expiration
Definition: helper.h:194
Definition: Debug.h:179
#define HELPER_MAX_ARGS
Definition: helper.cc:35
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:470
#define DBG_DATA
Definition: Debug.h:47
const char * rawContent() const
Definition: SBuf.cc:539
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
static void helperServerFree(helper_server *srv)
Definition: helper.cc:713
unsigned int concurrency
Definition: ChildConfig.h:72
unsigned int queue_size
Definition: ChildConfig.h:91
struct timeval dispatch_time
Definition: Request.h:44
uint64_t replies
Definition: helper.h:198
static void helperDispatch(helper_server *srv, Helper::Xaction *r)
Definition: helper.cc:1331
int commSetConnTimeout(const Comm::ConnectionPointer &conn, int timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:552
int queue_size
Definition: helper.h:119
static void SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:384
MemBuf * wqueue
Definition: helper.h:216
#define NULL
Definition: types.h:166
MemBuf * writebuf
Definition: helper.h:217
bool willOverload() const
Definition: helper.cc:611
char progname[]
int reconfiguring
static void helperReturnBuffer(helper_server *srv, helper *hlp, char *msg, size_t msgSize, char *msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:871
const InstanceId< HelperServerBase > index
Definition: helper.h:170
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:35
uint64_t pending
Definition: helper.h:199
void submitRequest(Helper::Xaction *r)
Definition: helper.cc:370
struct HelperServerBase::@75 stats
int placeholder
Definition: Request.h:43
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1305
struct helper::_stats stats
wordlist * next
Definition: wordlist.h:34
int needNew() const
Definition: ChildConfig.cc:59

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors