aiops.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 AIOPS */
10 
11 #ifndef _REENTRANT
12 #error "_REENTRANT MUST be defined to build squid async io support."
13 #endif
14 
15 #include "squid.h"
17 #include "DiskThreads.h"
18 #include "SquidConfig.h"
19 #include "SquidTime.h"
20 #include "Store.h"
21 
22 /*
23  * struct stat and squidaio_xstrdup use explicit pool alloc()/freeOne().
24  * XXX: convert to MEMPROXY_CLASS() API
25  */
26 #include "mem/Pool.h"
27 
28 #include <cerrno>
29 #include <csignal>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <pthread.h>
33 #include <dirent.h>
34 #if HAVE_SCHED_H
35 #include <sched.h>
36 #endif
37 
38 #define RIDICULOUS_LENGTH 4096
39 
46 };
48 
49 typedef struct squidaio_request_t {
50 
53  int cancelled;
54  char *path;
55  int oflag;
57  int fd;
58  char *bufferp;
59  size_t buflen;
60  off_t offset;
61  int whence;
62  int ret;
63  int err;
64 
65  struct stat *tmpstatp;
66 
67  struct stat *statp;
70 
71 typedef struct squidaio_request_queue_t {
72  pthread_mutex_t mutex;
73  pthread_cond_t cond;
75  squidaio_request_t *volatile *volatile tailp;
76  unsigned long requests;
77  unsigned long blocked; /* main failed to lock the queue */
79 
81 
84  pthread_t thread;
85  squidaio_thread_status status;
86 
88  unsigned long requests;
89 };
90 
93 void *squidaio_thread_loop(void *);
100 #if AIO_OPENDIR
101 static void *squidaio_do_opendir(squidaio_request_t *);
102 #endif
103 static void squidaio_debug(squidaio_request_t *);
104 static void squidaio_poll_queues(void);
105 
106 static squidaio_thread_t *threads = NULL;
107 static int squidaio_initialised = 0;
108 
109 #define AIO_LARGE_BUFS 16384
110 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
111 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
112 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
113 #define AIO_MICRO_BUFS 128
114 
115 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
116 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
117 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
118 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
119 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
120 
121 static int request_queue_len = 0;
122 static MemAllocator *squidaio_request_pool = NULL;
123 static MemAllocator *squidaio_thread_pool = NULL;
125 
126 static struct {
128 }
129 
130 request_queue2 = {
131 
132  NULL, &request_queue2.head
133 };
135 
136 static struct {
138 }
139 
140 done_requests = {
141 
142  NULL, &done_requests.head
143 };
144 static pthread_attr_t globattr;
145 #if HAVE_SCHED_H
146 
147 static struct sched_param globsched;
148 #endif
149 static pthread_t main_thread;
150 
151 static MemAllocator *
153 {
154  if (size <= AIO_LARGE_BUFS) {
155  if (size <= AIO_MICRO_BUFS)
156  return squidaio_micro_bufs;
157  else if (size <= AIO_TINY_BUFS)
158  return squidaio_tiny_bufs;
159  else if (size <= AIO_SMALL_BUFS)
160  return squidaio_small_bufs;
161  else if (size <= AIO_MEDIUM_BUFS)
162  return squidaio_medium_bufs;
163  else
164  return squidaio_large_bufs;
165  }
166 
167  return NULL;
168 }
169 
170 void *
172 {
173  void *p;
174  MemAllocator *pool;
175 
176  if ((pool = squidaio_get_pool(size)) != NULL) {
177  p = pool->alloc();
178  } else
179  p = xmalloc(size);
180 
181  return p;
182 }
183 
184 static char *
185 squidaio_xstrdup(const char *str)
186 {
187  char *p;
188  int len = strlen(str) + 1;
189 
190  p = (char *)squidaio_xmalloc(len);
191  strncpy(p, str, len);
192 
193  return p;
194 }
195 
196 void
197 squidaio_xfree(void *p, int size)
198 {
199  MemAllocator *pool;
200 
201  if ((pool = squidaio_get_pool(size)) != NULL) {
202  pool->freeOne(p);
203  } else
204  xfree(p);
205 }
206 
207 static void
209 {
210  MemAllocator *pool;
211  int len = strlen(str) + 1;
212 
213  if ((pool = squidaio_get_pool(len)) != NULL) {
214  pool->freeOne(str);
215  } else
216  xfree(str);
217 }
218 
219 void
221 {
222  int i;
223  squidaio_thread_t *threadp;
224 
225  if (squidaio_initialised)
226  return;
227 
228  pthread_attr_init(&globattr);
229 
230 #if HAVE_PTHREAD_ATTR_SETSCOPE
231 
232  pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
233 
234 #endif
235 #if HAVE_SCHED_H
236 
237  globsched.sched_priority = 1;
238 
239 #endif
240 
241  main_thread = pthread_self();
242 
243 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
244 
245  pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
246 
247 #endif
248 #if HAVE_SCHED_H
249 
250  globsched.sched_priority = 2;
251 
252 #endif
253 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
254 
255  pthread_attr_setschedparam(&globattr, &globsched);
256 
257 #endif
258 
259  /* Give each thread a smaller 256KB stack, should be more than sufficient */
260  pthread_attr_setstacksize(&globattr, 256 * 1024);
261 
262  /* Initialize request queue */
263  if (pthread_mutex_init(&(request_queue.mutex), NULL))
264  fatal("Failed to create mutex");
265 
266  if (pthread_cond_init(&(request_queue.cond), NULL))
267  fatal("Failed to create condition variable");
268 
269  request_queue.head = NULL;
270 
271  request_queue.tailp = &request_queue.head;
272 
273  request_queue.requests = 0;
274 
275  request_queue.blocked = 0;
276 
277  /* Initialize done queue */
278  if (pthread_mutex_init(&(done_queue.mutex), NULL))
279  fatal("Failed to create mutex");
280 
281  if (pthread_cond_init(&(done_queue.cond), NULL))
282  fatal("Failed to create condition variable");
283 
284  done_queue.head = NULL;
285 
286  done_queue.tailp = &done_queue.head;
287 
288  done_queue.requests = 0;
289 
290  done_queue.blocked = 0;
291 
292  // Initialize the thread I/O pipes before creating any threads
293  // see bug 3189 comment 5 about race conditions.
295 
296  /* Create threads and get them to sit in their wait loop */
297  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
298 
299  assert(NUMTHREADS != 0);
300 
301  for (i = 0; i < NUMTHREADS; ++i) {
302  threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
303  threadp->status = _THREAD_STARTING;
304  threadp->current_req = NULL;
305  threadp->requests = 0;
306  threadp->next = threads;
307  threads = threadp;
308 
309  if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
310  fprintf(stderr, "Thread creation failed\n");
311  threadp->status = _THREAD_FAILED;
312  continue;
313  }
314  }
315 
316  /* Create request pool */
317  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
318 
319  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
320 
321  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
322 
323  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
324 
325  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
326 
327  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
328 
329  squidaio_initialised = 1;
330 }
331 
332 void
334 {
335  if (!squidaio_initialised)
336  return;
337 
338  /* This is the same as in squidaio_sync */
339  do {
341  } while (request_queue_len > 0);
342 
344 
345  squidaio_initialised = 0;
346 }
347 
348 void *
350 {
351  squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
353  sigset_t newSig;
354 
355  /*
356  * Make sure to ignore signals which may possibly get sent to
357  * the parent squid thread. Causes havoc with mutex's and
358  * condition waits otherwise
359  */
360 
361  sigemptyset(&newSig);
362  sigaddset(&newSig, SIGPIPE);
363  sigaddset(&newSig, SIGCHLD);
364 #if defined(_SQUID_LINUX_THREADS_)
365 
366  sigaddset(&newSig, SIGQUIT);
367  sigaddset(&newSig, SIGTRAP);
368 #else
369 
370  sigaddset(&newSig, SIGUSR1);
371  sigaddset(&newSig, SIGUSR2);
372 #endif
373 
374  sigaddset(&newSig, SIGHUP);
375  sigaddset(&newSig, SIGTERM);
376  sigaddset(&newSig, SIGINT);
377  sigaddset(&newSig, SIGALRM);
378  pthread_sigmask(SIG_BLOCK, &newSig, NULL);
379 
380  while (1) {
381  threadp->current_req = request = NULL;
382  request = NULL;
383  /* Get a request to process */
384  threadp->status = _THREAD_WAITING;
385  pthread_mutex_lock(&request_queue.mutex);
386 
387  while (!request_queue.head) {
388  pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
389  }
390 
391  request = request_queue.head;
392 
393  if (request)
394  request_queue.head = request->next;
395 
396  if (!request_queue.head)
397  request_queue.tailp = &request_queue.head;
398 
399  pthread_mutex_unlock(&request_queue.mutex);
400 
401  /* process the request */
402  threadp->status = _THREAD_BUSY;
403 
404  request->next = NULL;
405 
406  threadp->current_req = request;
407 
408  errno = 0;
409 
410  if (!request->cancelled) {
411  switch (request->request_type) {
412 
413  case _AIO_OP_OPEN:
414  squidaio_do_open(request);
415  break;
416 
417  case _AIO_OP_READ:
418  squidaio_do_read(request);
419  break;
420 
421  case _AIO_OP_WRITE:
422  squidaio_do_write(request);
423  break;
424 
425  case _AIO_OP_CLOSE:
426  squidaio_do_close(request);
427  break;
428 
429  case _AIO_OP_UNLINK:
430  squidaio_do_unlink(request);
431  break;
432 
433 #if AIO_OPENDIR /* Opendir not implemented yet */
434 
435  case _AIO_OP_OPENDIR:
436  squidaio_do_opendir(request);
437  break;
438 #endif
439 
440  case _AIO_OP_STAT:
441  squidaio_do_stat(request);
442  break;
443 
444  default:
445  request->ret = -1;
446  request->err = EINVAL;
447  break;
448  }
449  } else { /* cancelled */
450  request->ret = -1;
451  request->err = EINTR;
452  }
453 
454  threadp->status = _THREAD_DONE;
455  /* put the request in the done queue */
456  pthread_mutex_lock(&done_queue.mutex);
457  *done_queue.tailp = request;
458  done_queue.tailp = &request->next;
459  pthread_mutex_unlock(&done_queue.mutex);
461  ++ threadp->requests;
462  } /* while forever */
463 
464  return NULL;
465 } /* squidaio_thread_loop */
466 
467 static void
469 {
470  static int high_start = 0;
471  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
472  /* Mark it as not executed (failing result, no error) */
473  request->ret = -1;
474  request->err = 0;
475  /* Internal housekeeping */
476  request_queue_len += 1;
477  request->resultp->_data = request;
478  /* Play some tricks with the request_queue2 queue */
479  request->next = NULL;
480 
481  if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
482  if (request_queue2.head) {
483  /* Grab blocked requests */
484  *request_queue.tailp = request_queue2.head;
485  request_queue.tailp = request_queue2.tailp;
486  }
487 
488  /* Enqueue request */
489  *request_queue.tailp = request;
490 
491  request_queue.tailp = &request->next;
492 
493  pthread_cond_signal(&request_queue.cond);
494 
495  pthread_mutex_unlock(&request_queue.mutex);
496 
497  if (request_queue2.head) {
498  /* Clear queue of blocked requests */
499  request_queue2.head = NULL;
500  request_queue2.tailp = &request_queue2.head;
501  }
502  } else {
503  /* Oops, the request queue is blocked, use request_queue2 */
504  *request_queue2.tailp = request;
505  request_queue2.tailp = &request->next;
506  }
507 
508  if (request_queue2.head) {
509  static uint64_t filter = 0;
510  static uint64_t filter_limit = 8192;
511 
512  if (++filter >= filter_limit) {
513  filter_limit += filter;
514  filter = 0;
515  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion (growing to " << filter_limit << ")");
516  }
517  }
518 
519  /* Warn if out of threads */
520  if (request_queue_len > MAGIC1) {
521  static int last_warn = 0;
522  static int queue_high, queue_low;
523 
524  if (high_start == 0) {
525  high_start = squid_curtime;
526  queue_high = request_queue_len;
527  queue_low = request_queue_len;
528  }
529 
530  if (request_queue_len > queue_high)
531  queue_high = request_queue_len;
532 
533  if (request_queue_len < queue_low)
534  queue_low = request_queue_len;
535 
536  if (squid_curtime >= (last_warn + 15) &&
537  squid_curtime >= (high_start + 5)) {
538  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
539 
540  if (squid_curtime >= (high_start + 15))
541  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
542  request_queue_len << ", high=" << queue_high <<
543  ", low=" << queue_low << ", duration=" <<
544  (long int) (squid_curtime - high_start));
545 
546  last_warn = squid_curtime;
547  }
548  } else {
549  high_start = 0;
550  }
551 
552  /* Warn if seriously overloaded */
553  if (request_queue_len > RIDICULOUS_LENGTH) {
554  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
555  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
556  squidaio_sync();
557  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
558  }
559 } /* squidaio_queue_request */
560 
561 static void
563 {
564  squidaio_result_t *resultp = requestp->resultp;
565  int cancelled = requestp->cancelled;
566 
567  /* Free allocated structures and copy data back to user space if the */
568  /* request hasn't been cancelled */
569 
570  switch (requestp->request_type) {
571 
572  case _AIO_OP_STAT:
573 
574  if (!cancelled && requestp->ret == 0)
575  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
576 
577  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
578 
579  squidaio_xstrfree(requestp->path);
580 
581  break;
582 
583  case _AIO_OP_OPEN:
584  if (cancelled && requestp->ret >= 0)
585  /* The open() was cancelled but completed */
586  close(requestp->ret);
587 
588  squidaio_xstrfree(requestp->path);
589 
590  break;
591 
592  case _AIO_OP_CLOSE:
593  if (cancelled && requestp->ret < 0)
594  /* The close() was cancelled and never got executed */
595  close(requestp->fd);
596 
597  break;
598 
599  case _AIO_OP_UNLINK:
600 
601  case _AIO_OP_OPENDIR:
602  squidaio_xstrfree(requestp->path);
603 
604  break;
605 
606  case _AIO_OP_READ:
607  break;
608 
609  case _AIO_OP_WRITE:
610  break;
611 
612  default:
613  break;
614  }
615 
616  if (resultp != NULL && !cancelled) {
617  resultp->aio_return = requestp->ret;
618  resultp->aio_errno = requestp->err;
619  }
620 
621  squidaio_request_pool->freeOne(requestp);
622 } /* squidaio_cleanup_request */
623 
624 int
626 {
628 
629  if (request && request->resultp == resultp) {
630  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
631  request->cancelled = 1;
632  request->resultp = NULL;
633  resultp->_data = NULL;
634  resultp->result_type = _AIO_OP_NONE;
635  return 0;
636  }
637 
638  return 1;
639 } /* squidaio_cancel */
640 
641 int
642 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
643 {
644  squidaio_init();
645  squidaio_request_t *requestp;
646 
647  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
648 
649  requestp->path = (char *) squidaio_xstrdup(path);
650 
651  requestp->oflag = oflag;
652 
653  requestp->mode = mode;
654 
655  requestp->resultp = resultp;
656 
657  requestp->request_type = _AIO_OP_OPEN;
658 
659  requestp->cancelled = 0;
660 
661  resultp->result_type = _AIO_OP_OPEN;
662 
663  squidaio_queue_request(requestp);
664 
665  return 0;
666 }
667 
668 static void
670 {
671  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
672  requestp->err = errno;
673 }
674 
675 int
676 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
677 {
678  squidaio_request_t *requestp;
679 
680  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
681 
682  requestp->fd = fd;
683 
684  requestp->bufferp = bufp;
685 
686  requestp->buflen = bufs;
687 
688  requestp->offset = offset;
689 
690  requestp->whence = whence;
691 
692  requestp->resultp = resultp;
693 
694  requestp->request_type = _AIO_OP_READ;
695 
696  requestp->cancelled = 0;
697 
698  resultp->result_type = _AIO_OP_READ;
699 
700  squidaio_queue_request(requestp);
701 
702  return 0;
703 }
704 
705 static void
707 {
708  if (lseek(requestp->fd, requestp->offset, requestp->whence) >= 0)
709  requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
710  else
711  requestp->ret = -1;
712  requestp->err = errno;
713 }
714 
715 int
716 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
717 {
718  squidaio_request_t *requestp;
719 
720  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
721 
722  requestp->fd = fd;
723 
724  requestp->bufferp = bufp;
725 
726  requestp->buflen = bufs;
727 
728  requestp->offset = offset;
729 
730  requestp->whence = whence;
731 
732  requestp->resultp = resultp;
733 
734  requestp->request_type = _AIO_OP_WRITE;
735 
736  requestp->cancelled = 0;
737 
738  resultp->result_type = _AIO_OP_WRITE;
739 
740  squidaio_queue_request(requestp);
741 
742  return 0;
743 }
744 
745 static void
747 {
748  requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
749  requestp->err = errno;
750 }
751 
752 int
754 {
755  squidaio_request_t *requestp;
756 
757  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
758 
759  requestp->fd = fd;
760 
761  requestp->resultp = resultp;
762 
763  requestp->request_type = _AIO_OP_CLOSE;
764 
765  requestp->cancelled = 0;
766 
767  resultp->result_type = _AIO_OP_CLOSE;
768 
769  squidaio_queue_request(requestp);
770 
771  return 0;
772 }
773 
774 static void
776 {
777  requestp->ret = close(requestp->fd);
778  requestp->err = errno;
779 }
780 
781 int
782 
783 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
784 {
785  squidaio_init();
786  squidaio_request_t *requestp;
787 
788  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
789 
790  requestp->path = (char *) squidaio_xstrdup(path);
791 
792  requestp->statp = sb;
793 
794  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
795 
796  requestp->resultp = resultp;
797 
798  requestp->request_type = _AIO_OP_STAT;
799 
800  requestp->cancelled = 0;
801 
802  resultp->result_type = _AIO_OP_STAT;
803 
804  squidaio_queue_request(requestp);
805 
806  return 0;
807 }
808 
809 static void
811 {
812  requestp->ret = stat(requestp->path, requestp->tmpstatp);
813  requestp->err = errno;
814 }
815 
816 int
817 squidaio_unlink(const char *path, squidaio_result_t * resultp)
818 {
819  squidaio_init();
820  squidaio_request_t *requestp;
821 
822  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
823 
824  requestp->path = squidaio_xstrdup(path);
825 
826  requestp->resultp = resultp;
827 
828  requestp->request_type = _AIO_OP_UNLINK;
829 
830  requestp->cancelled = 0;
831 
832  resultp->result_type = _AIO_OP_UNLINK;
833 
834  squidaio_queue_request(requestp);
835 
836  return 0;
837 }
838 
839 static void
841 {
842  requestp->ret = unlink(requestp->path);
843  requestp->err = errno;
844 }
845 
846 #if AIO_OPENDIR
847 /* XXX squidaio_opendir NOT implemented yet.. */
848 
849 int
850 squidaio_opendir(const char *path, squidaio_result_t * resultp)
851 {
852  squidaio_request_t *requestp;
853  int len;
854 
855  requestp = squidaio_request_pool->alloc();
856 
857  resultp->result_type = _AIO_OP_OPENDIR;
858 
859  return -1;
860 }
861 
862 static void
863 squidaio_do_opendir(squidaio_request_t * requestp)
864 {
865  /* NOT IMPLEMENTED */
866 }
867 
868 #endif
869 
870 static void
872 {
873  /* kick "overflow" request queue */
874 
875  if (request_queue2.head &&
876  pthread_mutex_trylock(&request_queue.mutex) == 0) {
877  *request_queue.tailp = request_queue2.head;
878  request_queue.tailp = request_queue2.tailp;
879  pthread_cond_signal(&request_queue.cond);
880  pthread_mutex_unlock(&request_queue.mutex);
881  request_queue2.head = NULL;
882  request_queue2.tailp = &request_queue2.head;
883  }
884 
885  /* poll done queue */
886  if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
887 
888  struct squidaio_request_t *requests = done_queue.head;
889  done_queue.head = NULL;
890  done_queue.tailp = &done_queue.head;
891  pthread_mutex_unlock(&done_queue.mutex);
892  *done_requests.tailp = requests;
893  request_queue_len -= 1;
894 
895  while (requests->next) {
896  requests = requests->next;
897  request_queue_len -= 1;
898  }
899 
900  done_requests.tailp = &requests->next;
901  }
902 }
903 
906 {
909  int cancelled;
910  int polled = 0;
911 
912 AIO_REPOLL:
913  request = done_requests.head;
914 
915  if (request == NULL && !polled) {
918  polled = 1;
919  request = done_requests.head;
920  }
921 
922  if (!request) {
923  return NULL;
924  }
925 
926  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
927  done_requests.head = request->next;
928 
929  if (!done_requests.head)
930  done_requests.tailp = &done_requests.head;
931 
932  resultp = request->resultp;
933 
934  cancelled = request->cancelled;
935 
936  squidaio_debug(request);
937 
938  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
939 
940  squidaio_cleanup_request(request);
941 
942  if (cancelled)
943  goto AIO_REPOLL;
944 
945  return resultp;
946 } /* squidaio_poll_done */
947 
948 int
950 {
951  return request_queue_len + (done_requests.head ? 1 : 0);
952 }
953 
954 int
956 {
957  /* XXX This might take a while if the queue is large.. */
958 
959  do {
961  } while (request_queue_len > 0);
962 
964 }
965 
966 int
968 {
969  return request_queue_len;
970 }
971 
972 static void
974 {
975  switch (request->request_type) {
976 
977  case _AIO_OP_OPEN:
978  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
979  break;
980 
981  case _AIO_OP_READ:
982  debugs(43, 5, "READ on fd: " << request->fd);
983  break;
984 
985  case _AIO_OP_WRITE:
986  debugs(43, 5, "WRITE on fd: " << request->fd);
987  break;
988 
989  case _AIO_OP_CLOSE:
990  debugs(43, 5, "CLOSE of fd: " << request->fd);
991  break;
992 
993  case _AIO_OP_UNLINK:
994  debugs(43, 5, "UNLINK of " << request->path);
995  break;
996 
997  default:
998  break;
999  }
1000 }
1001 
1002 void
1004 {
1005  squidaio_thread_t *threadp;
1006  int i;
1007 
1008  if (!squidaio_initialised)
1009  return;
1010 
1011  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1012 
1013  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1014 
1015  threadp = threads;
1016 
1017  for (i = 0; i < NUMTHREADS; ++i) {
1018  storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1019  threadp = threadp->next;
1020  }
1021 }
1022 
#define AIO_TINY_BUFS
Definition: aiops.cc:112
static void ResetNotifications()
Definition: CommIO.cc:69
static void squidaio_poll_queues(void)
Definition: aiops.cc:871
#define assert(EX)
Definition: assert.h:17
squidaio_request_type request_type
Definition: aiops.cc:52
#define RIDICULOUS_LENGTH
Definition: aiops.cc:38
int squidaio_operations_pending(void)
Definition: aiops.cc:949
unsigned long requests
Definition: aiops.cc:88
static struct @46 request_queue2
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:676
struct squidaio_request_t squidaio_request_t
static void squidaio_xstrfree(char *str)
Definition: aiops.cc:208
static void squidaio_debug(squidaio_request_t *)
Definition: aiops.cc:973
virtual void freeOne(void *)=0
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
int i
Definition: membanger.c:49
size_t buflen
Definition: aiops.cc:59
static MemAllocator * squidaio_tiny_bufs
Definition: aiops.cc:118
static struct stat sb
Definition: squidclient.cc:75
static void NotifyIOClose()
Definition: CommIO.cc:38
struct squidaio_request_t * current_req
Definition: aiops.cc:87
int squidaio_opendir(const char *, squidaio_result_t *)
#define DBG_CRITICAL
Definition: Debug.h:44
char * p
Definition: membanger.c:43
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:716
char * bufferp
Definition: aiops.cc:58
#define AIO_MEDIUM_BUFS
Definition: aiops.cc:110
time_t squid_curtime
Definition: stub_time.cc:17
void squidaio_shutdown(void)
Definition: aiops.cc:333
void * squidaio_thread_loop(void *)
Definition: aiops.cc:349
#define NUMTHREADS
Definition: DiskThreads.h:30
static void NotifyIOCompleted()
Definition: CommIO.h:36
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops.cc:625
squidaio_result_t * squidaio_poll_done(void)
Definition: aiops.cc:905
#define memPoolCreate
Definition: Pool.h:325
static squidaio_request_queue_t done_queue
Definition: aiops.cc:134
static char * squidaio_xstrdup(const char *str)
Definition: aiops.cc:185
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
#define DBG_IMPORTANT
Definition: Debug.h:45
#define AIO_SMALL_BUFS
Definition: aiops.cc:111
static pthread_attr_t globattr
Definition: aiops.cc:144
struct stat * tmpstatp
Definition: aiops.cc:65
squidaio_result_t * resultp
Definition: aiops.cc:68
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops.cc:468
struct squidaio_request_queue_t squidaio_request_queue_t
static MemAllocator * squidaio_micro_bufs
Definition: aiops.cc:119
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops.cc:669
static MemAllocator * squidaio_medium_bufs
Definition: aiops.cc:116
struct squidaio_request_t * next
Definition: aiops.cc:51
static int request_queue_len
Definition: aiops.cc:121
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops.cc:810
squidaio_thread_status status
Definition: aiops.cc:85
void fatal(const char *message)
Definition: fatal.cc:39
#define AIO_LARGE_BUFS
Definition: aiops.cc:109
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:47
int squidaio_get_queue_len(void)
Definition: aiops.cc:967
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:75
void squidaio_init(void)
Definition: aiops.cc:220
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops.cc:746
static void Initialize()
Definition: CommIO.cc:19
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops.cc:840
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops.cc:775
unsigned short mode_t
Definition: types.h:150
static squidaio_request_queue_t request_queue
Definition: aiops.cc:124
void squidaio_xfree(void *p, int size)
Definition: aiops.cc:197
unsigned long blocked
Definition: aiops.cc:77
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops.cc:562
pthread_mutex_t mutex
Definition: aiops.cc:72
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops.cc:817
squidaio_thread_t * next
Definition: aiops.cc:83
#define xmalloc
_squidaio_thread_status
Definition: aiops.cc:40
pthread_t thread
Definition: aiops.cc:84
int squidaio_sync(void)
Definition: aiops.cc:955
void * squidaio_xmalloc(int size)
Definition: aiops.cc:171
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops.cc:783
#define MAGIC1
Definition: DiskThreads.h:34
unsigned long requests
Definition: aiops.cc:76
#define AIO_MICRO_BUFS
Definition: aiops.cc:113
struct stat * statp
Definition: aiops.cc:67
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops.cc:706
static struct @47 done_requests
static MemAllocator * squidaio_small_bufs
Definition: aiops.cc:117
pthread_cond_t cond
Definition: aiops.cc:73
static pthread_t main_thread
Definition: aiops.cc:149
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
squidaio_request_t ** tailp
Definition: aiops.cc:127
void squidaio_stats(StoreEntry *sentry)
Definition: aiops.cc:1003
static int squidaio_initialised
Definition: aiops.cc:107
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
#define xfree
static MemAllocator * squidaio_get_pool(int size)
Definition: aiops.cc:152
static MemAllocator * squidaio_large_bufs
Definition: aiops.cc:115
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:904
squidaio_request_t *volatile head
Definition: aiops.cc:74
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops.cc:753
squidaio_request_t * head
Definition: aiops.cc:127
#define NULL
Definition: types.h:166
int size
Definition: ModDevPoll.cc:77
static squidaio_thread_t * threads
Definition: aiops.cc:106
virtual void * alloc()=0
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops.cc:642

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors