aiops.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 AIOPS */
10 
11 #ifndef _REENTRANT
12 #error "_REENTRANT MUST be defined to build squid async io support."
13 #endif
14 
15 #include "squid.h"
17 #include "DiskThreads.h"
18 #include "SquidConfig.h"
19 #include "Store.h"
20 
21 /*
22  * struct stat and squidaio_xstrdup use explicit pool alloc()/freeOne().
23  * XXX: convert to MEMPROXY_CLASS() API
24  */
25 #include "mem/Allocator.h"
26 #include "mem/Pool.h"
27 
28 #include <cerrno>
29 #include <csignal>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <pthread.h>
33 #include <dirent.h>
34 #if HAVE_SCHED_H
35 #include <sched.h>
36 #endif
37 
38 #define RIDICULOUS_LENGTH 4096
39 
46 };
48 
49 typedef struct squidaio_request_t {
50 
53  int cancelled;
54  char *path;
55  int oflag;
57  int fd;
58  char *bufferp;
59  size_t buflen;
60  off_t offset;
61  int whence;
62  int ret;
63  int err;
64 
65  struct stat *tmpstatp;
66 
67  struct stat *statp;
70 
71 typedef struct squidaio_request_queue_t {
72  pthread_mutex_t mutex;
73  pthread_cond_t cond;
75  squidaio_request_t *volatile *volatile tailp;
76  unsigned long requests;
77  unsigned long blocked; /* main failed to lock the queue */
79 
81 
84  pthread_t thread;
86 
88  unsigned long requests;
89 };
90 
93 void *squidaio_thread_loop(void *);
100 #if AIO_OPENDIR
101 static void *squidaio_do_opendir(squidaio_request_t *);
102 #endif
103 static void squidaio_debug(squidaio_request_t *);
104 static void squidaio_poll_queues(void);
105 
106 static squidaio_thread_t *threads = nullptr;
107 static int squidaio_initialised = 0;
108 
109 #define AIO_LARGE_BUFS 16384
110 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
111 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
112 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
113 #define AIO_MICRO_BUFS 128
114 
115 static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
116 static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
117 static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
118 static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
119 static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
120 
121 static size_t request_queue_len = 0;
125 
126 static struct {
128 }
129 
130 request_queue2 = {
131 
132  nullptr, &request_queue2.head
133 };
135 
136 static struct {
138 }
139 
140 done_requests = {
141 
142  nullptr, &done_requests.head
143 };
144 static pthread_attr_t globattr;
145 #if HAVE_SCHED_H
146 
147 static struct sched_param globsched;
148 #endif
149 static pthread_t main_thread;
150 
151 static Mem::Allocator *
153 {
154  if (size <= AIO_LARGE_BUFS) {
155  if (size <= AIO_MICRO_BUFS)
156  return squidaio_micro_bufs;
157  else if (size <= AIO_TINY_BUFS)
158  return squidaio_tiny_bufs;
159  else if (size <= AIO_SMALL_BUFS)
160  return squidaio_small_bufs;
161  else if (size <= AIO_MEDIUM_BUFS)
162  return squidaio_medium_bufs;
163  else
164  return squidaio_large_bufs;
165  }
166 
167  return nullptr;
168 }
169 
170 void *
172 {
173  void *p;
174 
175  if (const auto pool = squidaio_get_pool(size)) {
176  p = pool->alloc();
177  } else
178  p = xmalloc(size);
179 
180  return p;
181 }
182 
183 static char *
184 squidaio_xstrdup(const char *str)
185 {
186  char *p;
187  int len = strlen(str) + 1;
188 
189  p = (char *)squidaio_xmalloc(len);
190  strncpy(p, str, len);
191 
192  return p;
193 }
194 
195 void
196 squidaio_xfree(void *p, int size)
197 {
198  if (const auto pool = squidaio_get_pool(size)) {
199  pool->freeOne(p);
200  } else
201  xfree(p);
202 }
203 
204 static void
206 {
207  int len = strlen(str) + 1;
208 
209  if (const auto pool = squidaio_get_pool(len)) {
210  pool->freeOne(str);
211  } else
212  xfree(str);
213 }
214 
215 void
217 {
218  squidaio_thread_t *threadp;
219 
221  return;
222 
223  pthread_attr_init(&globattr);
224 
225 #if HAVE_PTHREAD_ATTR_SETSCOPE
226 
227  pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
228 
229 #endif
230 #if HAVE_SCHED_H
231 
232  globsched.sched_priority = 1;
233 
234 #endif
235 
236  main_thread = pthread_self();
237 
238 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
239 
240  pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
241 
242 #endif
243 #if HAVE_SCHED_H
244 
245  globsched.sched_priority = 2;
246 
247 #endif
248 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
249 
250  pthread_attr_setschedparam(&globattr, &globsched);
251 
252 #endif
253 
254  /* Give each thread a smaller 256KB stack, should be more than sufficient */
255  pthread_attr_setstacksize(&globattr, 256 * 1024);
256 
257  /* Initialize request queue */
258  if (pthread_mutex_init(&(request_queue.mutex), nullptr))
259  fatal("Failed to create mutex");
260 
261  if (pthread_cond_init(&(request_queue.cond), nullptr))
262  fatal("Failed to create condition variable");
263 
264  request_queue.head = nullptr;
265 
267 
269 
271 
272  /* Initialize done queue */
273  if (pthread_mutex_init(&(done_queue.mutex), nullptr))
274  fatal("Failed to create mutex");
275 
276  if (pthread_cond_init(&(done_queue.cond), nullptr))
277  fatal("Failed to create condition variable");
278 
279  done_queue.head = nullptr;
280 
282 
283  done_queue.requests = 0;
284 
285  done_queue.blocked = 0;
286 
287  // Initialize the thread I/O pipes before creating any threads
288  // see bug 3189 comment 5 about race conditions.
290 
291  /* Create threads and get them to sit in their wait loop */
292  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
293 
294  assert(NUMTHREADS != 0);
295 
296  for (size_t i = 0; i < NUMTHREADS; ++i) {
298  threadp->status = _THREAD_STARTING;
299  threadp->current_req = nullptr;
300  threadp->requests = 0;
301  threadp->next = threads;
302  threads = threadp;
303 
304  if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
305  fprintf(stderr, "Thread creation failed\n");
306  threadp->status = _THREAD_FAILED;
307  continue;
308  }
309  }
310 
311  /* Create request pool */
312  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
313 
314  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
315 
316  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
317 
318  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
319 
320  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
321 
322  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
323 
325 }
326 
327 void
329 {
331  return;
332 
333  /* This is the same as in squidaio_sync */
334  do {
336  } while (request_queue_len > 0);
337 
339 
341 }
342 
343 void *
345 {
346  squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
347  squidaio_request_t *request;
348  sigset_t newSig;
349 
350  /*
351  * Make sure to ignore signals which may possibly get sent to
352  * the parent squid thread. Causes havoc with mutex's and
353  * condition waits otherwise
354  */
355 
356  sigemptyset(&newSig);
357  sigaddset(&newSig, SIGPIPE);
358  sigaddset(&newSig, SIGCHLD);
359 #if defined(_SQUID_LINUX_THREADS_)
360 
361  sigaddset(&newSig, SIGQUIT);
362  sigaddset(&newSig, SIGTRAP);
363 #else
364 
365  sigaddset(&newSig, SIGUSR1);
366  sigaddset(&newSig, SIGUSR2);
367 #endif
368 
369  sigaddset(&newSig, SIGHUP);
370  sigaddset(&newSig, SIGTERM);
371  sigaddset(&newSig, SIGINT);
372  sigaddset(&newSig, SIGALRM);
373  pthread_sigmask(SIG_BLOCK, &newSig, nullptr);
374 
375  while (1) {
376  threadp->current_req = request = nullptr;
377  request = nullptr;
378  /* Get a request to process */
379  threadp->status = _THREAD_WAITING;
380  pthread_mutex_lock(&request_queue.mutex);
381 
382  while (!request_queue.head) {
383  pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
384  }
385 
386  request = request_queue.head;
387 
388  if (request)
389  request_queue.head = request->next;
390 
391  if (!request_queue.head)
393 
394  pthread_mutex_unlock(&request_queue.mutex);
395 
396  /* process the request */
397  threadp->status = _THREAD_BUSY;
398 
399  request->next = nullptr;
400 
401  threadp->current_req = request;
402 
403  errno = 0;
404 
405  if (!request->cancelled) {
406  switch (request->request_type) {
407 
408  case _AIO_OP_OPEN:
409  squidaio_do_open(request);
410  break;
411 
412  case _AIO_OP_READ:
413  squidaio_do_read(request);
414  break;
415 
416  case _AIO_OP_WRITE:
417  squidaio_do_write(request);
418  break;
419 
420  case _AIO_OP_CLOSE:
421  squidaio_do_close(request);
422  break;
423 
424  case _AIO_OP_UNLINK:
425  squidaio_do_unlink(request);
426  break;
427 
428 #if AIO_OPENDIR /* Opendir not implemented yet */
429 
430  case _AIO_OP_OPENDIR:
431  squidaio_do_opendir(request);
432  break;
433 #endif
434 
435  case _AIO_OP_STAT:
436  squidaio_do_stat(request);
437  break;
438 
439  default:
440  request->ret = -1;
441  request->err = EINVAL;
442  break;
443  }
444  } else { /* cancelled */
445  request->ret = -1;
446  request->err = EINTR;
447  }
448 
449  threadp->status = _THREAD_DONE;
450  /* put the request in the done queue */
451  pthread_mutex_lock(&done_queue.mutex);
452  *done_queue.tailp = request;
453  done_queue.tailp = &request->next;
454  pthread_mutex_unlock(&done_queue.mutex);
456  ++ threadp->requests;
457  } /* while forever */
458 
459  return nullptr;
460 } /* squidaio_thread_loop */
461 
462 static void
464 {
465  static int high_start = 0;
466  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
467  /* Mark it as not executed (failing result, no error) */
468  request->ret = -1;
469  request->err = 0;
470  /* Internal housekeeping */
471  request_queue_len += 1;
472  request->resultp->_data = request;
473  /* Play some tricks with the request_queue2 queue */
474  request->next = nullptr;
475 
476  if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
477  if (request_queue2.head) {
478  /* Grab blocked requests */
481  }
482 
483  /* Enqueue request */
484  *request_queue.tailp = request;
485 
486  request_queue.tailp = &request->next;
487 
488  pthread_cond_signal(&request_queue.cond);
489 
490  pthread_mutex_unlock(&request_queue.mutex);
491 
492  if (request_queue2.head) {
493  /* Clear queue of blocked requests */
494  request_queue2.head = nullptr;
495  request_queue2.tailp = &request_queue2.head;
496  }
497  } else {
498  /* Oops, the request queue is blocked, use request_queue2 */
499  *request_queue2.tailp = request;
500  request_queue2.tailp = &request->next;
501  }
502 
503  if (request_queue2.head) {
504  static uint64_t filter = 0;
505  static uint64_t filter_limit = 8192;
506 
507  if (++filter >= filter_limit) {
508  filter_limit += filter;
509  filter = 0;
510  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
511  }
512  }
513 
514  /* Warn if out of threads */
515  if (request_queue_len > MAGIC1) {
516  static int last_warn = 0;
517  static size_t queue_high, queue_low;
518 
519  if (high_start == 0) {
520  high_start = squid_curtime;
521  queue_high = request_queue_len;
522  queue_low = request_queue_len;
523  }
524 
525  if (request_queue_len > queue_high)
526  queue_high = request_queue_len;
527 
528  if (request_queue_len < queue_low)
529  queue_low = request_queue_len;
530 
531  if (squid_curtime >= (last_warn + 15) &&
532  squid_curtime >= (high_start + 5)) {
533  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
534 
535  if (squid_curtime >= (high_start + 15))
536  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
537  request_queue_len << ", high=" << queue_high <<
538  ", low=" << queue_low << ", duration=" <<
539  (long int) (squid_curtime - high_start));
540 
541  last_warn = squid_curtime;
542  }
543  } else {
544  high_start = 0;
545  }
546 
547  /* Warn if seriously overloaded */
549  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
550  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
551  squidaio_sync();
552  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
553  }
554 } /* squidaio_queue_request */
555 
556 static void
558 {
559  squidaio_result_t *resultp = requestp->resultp;
560  int cancelled = requestp->cancelled;
561 
562  /* Free allocated structures and copy data back to user space if the */
563  /* request hasn't been cancelled */
564 
565  switch (requestp->request_type) {
566 
567  case _AIO_OP_STAT:
568 
569  if (!cancelled && requestp->ret == 0)
570  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
571 
572  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
573 
574  squidaio_xstrfree(requestp->path);
575 
576  break;
577 
578  case _AIO_OP_OPEN:
579  if (cancelled && requestp->ret >= 0)
580  /* The open() was cancelled but completed */
581  close(requestp->ret);
582 
583  squidaio_xstrfree(requestp->path);
584 
585  break;
586 
587  case _AIO_OP_CLOSE:
588  if (cancelled && requestp->ret < 0)
589  /* The close() was cancelled and never got executed */
590  close(requestp->fd);
591 
592  break;
593 
594  case _AIO_OP_UNLINK:
595 
596  case _AIO_OP_OPENDIR:
597  squidaio_xstrfree(requestp->path);
598 
599  break;
600 
601  case _AIO_OP_READ:
602  break;
603 
604  case _AIO_OP_WRITE:
605  break;
606 
607  default:
608  break;
609  }
610 
611  if (resultp != nullptr && !cancelled) {
612  resultp->aio_return = requestp->ret;
613  resultp->aio_errno = requestp->err;
614  }
615 
616  squidaio_request_pool->freeOne(requestp);
617 } /* squidaio_cleanup_request */
618 
619 int
621 {
622  squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
623 
624  if (request && request->resultp == resultp) {
625  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
626  request->cancelled = 1;
627  request->resultp = nullptr;
628  resultp->_data = nullptr;
629  resultp->result_type = _AIO_OP_NONE;
630  return 0;
631  }
632 
633  return 1;
634 } /* squidaio_cancel */
635 
636 int
637 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
638 {
639  squidaio_init();
640  squidaio_request_t *requestp;
641 
643 
644  requestp->path = (char *) squidaio_xstrdup(path);
645 
646  requestp->oflag = oflag;
647 
648  requestp->mode = mode;
649 
650  requestp->resultp = resultp;
651 
652  requestp->request_type = _AIO_OP_OPEN;
653 
654  requestp->cancelled = 0;
655 
656  resultp->result_type = _AIO_OP_OPEN;
657 
658  squidaio_queue_request(requestp);
659 
660  return 0;
661 }
662 
663 static void
665 {
666  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
667  requestp->err = errno;
668 }
669 
670 int
671 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
672 {
673  squidaio_request_t *requestp;
674 
676 
677  requestp->fd = fd;
678 
679  requestp->bufferp = bufp;
680 
681  requestp->buflen = bufs;
682 
683  requestp->offset = offset;
684 
685  requestp->whence = whence;
686 
687  requestp->resultp = resultp;
688 
689  requestp->request_type = _AIO_OP_READ;
690 
691  requestp->cancelled = 0;
692 
693  resultp->result_type = _AIO_OP_READ;
694 
695  squidaio_queue_request(requestp);
696 
697  return 0;
698 }
699 
700 static void
702 {
703  if (lseek(requestp->fd, requestp->offset, requestp->whence) >= 0)
704  requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
705  else
706  requestp->ret = -1;
707  requestp->err = errno;
708 }
709 
710 int
711 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
712 {
713  squidaio_request_t *requestp;
714 
716 
717  requestp->fd = fd;
718 
719  requestp->bufferp = bufp;
720 
721  requestp->buflen = bufs;
722 
723  requestp->offset = offset;
724 
725  requestp->whence = whence;
726 
727  requestp->resultp = resultp;
728 
729  requestp->request_type = _AIO_OP_WRITE;
730 
731  requestp->cancelled = 0;
732 
733  resultp->result_type = _AIO_OP_WRITE;
734 
735  squidaio_queue_request(requestp);
736 
737  return 0;
738 }
739 
740 static void
742 {
743  requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
744  requestp->err = errno;
745 }
746 
747 int
749 {
750  squidaio_request_t *requestp;
751 
753 
754  requestp->fd = fd;
755 
756  requestp->resultp = resultp;
757 
758  requestp->request_type = _AIO_OP_CLOSE;
759 
760  requestp->cancelled = 0;
761 
762  resultp->result_type = _AIO_OP_CLOSE;
763 
764  squidaio_queue_request(requestp);
765 
766  return 0;
767 }
768 
769 static void
771 {
772  requestp->ret = close(requestp->fd);
773  requestp->err = errno;
774 }
775 
776 int
777 
778 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
779 {
780  squidaio_init();
781  squidaio_request_t *requestp;
782 
784 
785  requestp->path = (char *) squidaio_xstrdup(path);
786 
787  requestp->statp = sb;
788 
789  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
790 
791  requestp->resultp = resultp;
792 
793  requestp->request_type = _AIO_OP_STAT;
794 
795  requestp->cancelled = 0;
796 
797  resultp->result_type = _AIO_OP_STAT;
798 
799  squidaio_queue_request(requestp);
800 
801  return 0;
802 }
803 
804 static void
806 {
807  requestp->ret = stat(requestp->path, requestp->tmpstatp);
808  requestp->err = errno;
809 }
810 
811 int
812 squidaio_unlink(const char *path, squidaio_result_t * resultp)
813 {
814  squidaio_init();
815  squidaio_request_t *requestp;
816 
818 
819  requestp->path = squidaio_xstrdup(path);
820 
821  requestp->resultp = resultp;
822 
823  requestp->request_type = _AIO_OP_UNLINK;
824 
825  requestp->cancelled = 0;
826 
827  resultp->result_type = _AIO_OP_UNLINK;
828 
829  squidaio_queue_request(requestp);
830 
831  return 0;
832 }
833 
834 static void
836 {
837  requestp->ret = unlink(requestp->path);
838  requestp->err = errno;
839 }
840 
841 #if AIO_OPENDIR
842 /* XXX squidaio_opendir NOT implemented yet.. */
843 
844 int
845 squidaio_opendir(const char *path, squidaio_result_t * resultp)
846 {
847  squidaio_request_t *requestp;
848  int len;
849 
850  requestp = squidaio_request_pool->alloc();
851 
852  resultp->result_type = _AIO_OP_OPENDIR;
853 
854  return -1;
855 }
856 
857 static void
858 squidaio_do_opendir(squidaio_request_t * requestp)
859 {
860  /* NOT IMPLEMENTED */
861 }
862 
863 #endif
864 
865 static void
867 {
868  /* kick "overflow" request queue */
869 
870  if (request_queue2.head &&
871  pthread_mutex_trylock(&request_queue.mutex) == 0) {
874  pthread_cond_signal(&request_queue.cond);
875  pthread_mutex_unlock(&request_queue.mutex);
876  request_queue2.head = nullptr;
877  request_queue2.tailp = &request_queue2.head;
878  }
879 
880  /* poll done queue */
881  if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
882 
883  struct squidaio_request_t *requests = done_queue.head;
884  done_queue.head = nullptr;
886  pthread_mutex_unlock(&done_queue.mutex);
887  *done_requests.tailp = requests;
888  request_queue_len -= 1;
889 
890  while (requests->next) {
891  requests = requests->next;
892  request_queue_len -= 1;
893  }
894 
895  done_requests.tailp = &requests->next;
896  }
897 }
898 
901 {
902  squidaio_request_t *request;
904  int cancelled;
905  int polled = 0;
906 
907 AIO_REPOLL:
908  request = done_requests.head;
909 
910  if (request == nullptr && !polled) {
913  polled = 1;
914  request = done_requests.head;
915  }
916 
917  if (!request) {
918  return nullptr;
919  }
920 
921  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
922  done_requests.head = request->next;
923 
924  if (!done_requests.head)
925  done_requests.tailp = &done_requests.head;
926 
927  resultp = request->resultp;
928 
929  cancelled = request->cancelled;
930 
931  squidaio_debug(request);
932 
933  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
934 
935  squidaio_cleanup_request(request);
936 
937  if (cancelled)
938  goto AIO_REPOLL;
939 
940  return resultp;
941 } /* squidaio_poll_done */
942 
943 int
945 {
946  return request_queue_len + (done_requests.head ? 1 : 0);
947 }
948 
949 int
951 {
952  /* XXX This might take a while if the queue is large.. */
953 
954  do {
956  } while (request_queue_len > 0);
957 
959 }
960 
961 int
963 {
964  return request_queue_len;
965 }
966 
967 static void
969 {
970  switch (request->request_type) {
971 
972  case _AIO_OP_OPEN:
973  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
974  break;
975 
976  case _AIO_OP_READ:
977  debugs(43, 5, "READ on fd: " << request->fd);
978  break;
979 
980  case _AIO_OP_WRITE:
981  debugs(43, 5, "WRITE on fd: " << request->fd);
982  break;
983 
984  case _AIO_OP_CLOSE:
985  debugs(43, 5, "CLOSE of fd: " << request->fd);
986  break;
987 
988  case _AIO_OP_UNLINK:
989  debugs(43, 5, "UNLINK of " << request->path);
990  break;
991 
992  default:
993  break;
994  }
995 }
996 
997 void
999 {
1000  squidaio_thread_t *threadp;
1001 
1002  if (!squidaio_initialised)
1003  return;
1004 
1005  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1006 
1007  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1008 
1009  threadp = threads;
1010 
1011  for (size_t i = 0; i < NUMTHREADS; ++i) {
1012  storeAppendPrintf(sentry, "%zu\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1013  threadp = threadp->next;
1014  }
1015 }
1016 
struct squidaio_request_queue_t squidaio_request_queue_t
void fatal(const char *message)
Definition: fatal.cc:28
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops.cc:835
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops.cc:701
static pthread_t main_thread
Definition: aiops.cc:149
#define DBG_CRITICAL
Definition: Stream.h:37
#define AIO_SMALL_BUFS
Definition: aiops.cc:111
#define xmalloc
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops.cc:557
squidaio_result_t * resultp
Definition: aiops.cc:68
struct squidaio_request_t * next
Definition: aiops.cc:51
squidaio_result_t * squidaio_poll_done(void)
Definition: aiops.cc:900
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops.cc:805
unsigned long requests
Definition: aiops.cc:88
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
void squidaio_stats(StoreEntry *sentry)
Definition: aiops.cc:998
static int squidaio_initialised
Definition: aiops.cc:107
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops.cc:664
@ _THREAD_DONE
Definition: aiops.cc:45
#define NUMTHREADS
Definition: DiskThreads.h:30
static void NotifyIOCompleted()
Definition: CommIO.h:36
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops.cc:620
static Mem::Allocator * squidaio_get_pool(int size)
Definition: aiops.cc:152
static char * squidaio_xstrdup(const char *str)
Definition: aiops.cc:184
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
#define RIDICULOUS_LENGTH
Definition: aiops.cc:38
static squidaio_thread_t * threads
Definition: aiops.cc:106
unsigned long requests
Definition: aiops.cc:76
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition: Allocator.h:44
_squidaio_thread_status
Definition: aiops.cc:40
pthread_mutex_t mutex
Definition: aiops.cc:72
static struct @43 done_requests
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
static void ResetNotifications()
Definition: CommIO.cc:70
void * squidaio_xmalloc(int size)
Definition: aiops.cc:171
static Mem::Allocator * squidaio_small_bufs
Definition: aiops.cc:117
struct stat * statp
Definition: aiops.cc:67
#define AIO_MEDIUM_BUFS
Definition: aiops.cc:110
@ _THREAD_BUSY
Definition: aiops.cc:43
#define AIO_MICRO_BUFS
Definition: aiops.cc:113
squidaio_request_type request_type
Definition: aiops.cc:52
pthread_t thread
Definition: aiops.cc:84
static void squidaio_xstrfree(char *str)
Definition: aiops.cc:205
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops.cc:778
#define AIO_LARGE_BUFS
Definition: aiops.cc:109
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops.cc:637
static size_t request_queue_len
Definition: aiops.cc:121
@ _AIO_OP_READ
Definition: DiskThreads.h:48
struct squidaio_request_t squidaio_request_t
static void NotifyIOClose()
Definition: CommIO.cc:39
void squidaio_init(void)
Definition: aiops.cc:216
void squidaio_xfree(void *p, int size)
Definition: aiops.cc:196
static void squidaio_poll_queues(void)
Definition: aiops.cc:866
int size
Definition: ModDevPoll.cc:69
void freeOne(void *obj)
return memory reserved by alloc()
Definition: Allocator.h:51
static Mem::Allocator * squidaio_request_pool
Definition: aiops.cc:122
static struct @42 request_queue2
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops.cc:741
static Mem::Allocator * squidaio_tiny_bufs
Definition: aiops.cc:118
#define MAGIC1
Definition: DiskThreads.h:34
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:711
@ _THREAD_FAILED
Definition: aiops.cc:44
squidaio_request_t ** tailp
Definition: aiops.cc:127
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition: Pool.h:123
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:671
static squidaio_request_queue_t done_queue
Definition: aiops.cc:134
squidaio_thread_status status
Definition: aiops.cc:85
void * squidaio_thread_loop(void *)
Definition: aiops.cc:344
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:47
#define assert(EX)
Definition: assert.h:17
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops.cc:748
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
static Mem::Allocator * squidaio_medium_bufs
Definition: aiops.cc:116
@ _THREAD_STARTING
Definition: aiops.cc:41
time_t squid_curtime
Definition: stub_libtime.cc:20
#define xfree
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
static pthread_attr_t globattr
Definition: aiops.cc:144
static squidaio_request_queue_t request_queue
Definition: aiops.cc:124
int squidaio_sync(void)
Definition: aiops.cc:950
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops.cc:463
squidaio_thread_t * next
Definition: aiops.cc:83
squidaio_request_t *volatile head
Definition: aiops.cc:74
size_t buflen
Definition: aiops.cc:59
squidaio_request_t * head
Definition: aiops.cc:127
static void squidaio_debug(squidaio_request_t *)
Definition: aiops.cc:968
static void Initialize()
Definition: CommIO.cc:20
static Mem::Allocator * squidaio_micro_bufs
Definition: aiops.cc:119
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:75
unsigned short mode_t
Definition: types.h:129
#define AIO_TINY_BUFS
Definition: aiops.cc:112
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops.cc:812
#define DBG_IMPORTANT
Definition: Stream.h:38
int squidaio_opendir(const char *, squidaio_result_t *)
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
static Mem::Allocator * squidaio_large_bufs
Definition: aiops.cc:115
struct squidaio_request_t * current_req
Definition: aiops.cc:87
pthread_cond_t cond
Definition: aiops.cc:73
unsigned long blocked
Definition: aiops.cc:77
int squidaio_operations_pending(void)
Definition: aiops.cc:944
char * bufferp
Definition: aiops.cc:58
static Mem::Allocator * squidaio_thread_pool
Definition: aiops.cc:123
int squidaio_get_queue_len(void)
Definition: aiops.cc:962
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void squidaio_shutdown(void)
Definition: aiops.cc:328
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops.cc:770
@ _THREAD_WAITING
Definition: aiops.cc:42
struct stat * tmpstatp
Definition: aiops.cc:65

 

Introduction

Documentation

Support

Miscellaneous