aiops_win32.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 Windows AIOPS */
10 
11 #include "squid.h"
13 #include "DiskThreads.h"
14 #include "fd.h"
15 #include "mem/Pool.h"
16 #include "SquidConfig.h"
17 #include "Store.h"
18 
19 #include <cerrno>
20 #include <csignal>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <dirent.h>
24 
25 #define RIDICULOUS_LENGTH 4096
26 
33 };
35 
36 typedef struct squidaio_request_t {
37 
38  struct squidaio_request_t *next;
40  int cancelled;
41  char *path;
42  int oflag;
43  mode_t mode;
44  int fd;
45  char *bufferp;
46  char *tmpbufp;
47  size_t buflen;
48  off_t offset;
49  int whence;
50  int ret;
51  int err;
52 
53  struct stat *tmpstatp;
54 
55  struct stat *statp;
58 
59 typedef struct squidaio_request_queue_t {
60  HANDLE mutex;
61  HANDLE cond; /* See Event objects */
62  squidaio_request_t *volatile head;
63  squidaio_request_t *volatile *volatile tailp;
64  unsigned long requests;
65  unsigned long blocked; /* main failed to lock the queue */
67 
69 
70 struct squidaio_thread_t {
72  HANDLE thread;
73  DWORD dwThreadId; /* thread ID */
75 
77  unsigned long requests;
78  int volatile exit;
79 };
80 
83 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
90 #if AIO_OPENDIR
91 static void *squidaio_do_opendir(squidaio_request_t *);
92 #endif
93 static void squidaio_debug(squidaio_request_t *);
94 static void squidaio_poll_queues(void);
95 
97 static int squidaio_initialised = 0;
98 
99 #define AIO_LARGE_BUFS 16384
100 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
101 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
102 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
103 #define AIO_MICRO_BUFS 128
104 
109 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
110 
111 static int request_queue_len = 0;
115 
116 static struct {
118 }
119 
120 request_queue2 = {
121 
122  NULL, &request_queue2.head
123 };
125 
126 static struct {
128 }
129 
130 done_requests = {
131 
132  NULL, &done_requests.head
133 };
134 
135 static HANDLE main_thread;
136 
137 static MemAllocator *
139 {
140  if (size <= AIO_LARGE_BUFS) {
141  if (size <= AIO_MICRO_BUFS)
142  return squidaio_micro_bufs;
143  else if (size <= AIO_TINY_BUFS)
144  return squidaio_tiny_bufs;
145  else if (size <= AIO_SMALL_BUFS)
146  return squidaio_small_bufs;
147  else if (size <= AIO_MEDIUM_BUFS)
148  return squidaio_medium_bufs;
149  else
150  return squidaio_large_bufs;
151  }
152 
153  return NULL;
154 }
155 
156 void *
158 {
159  void *p;
160  MemAllocator *pool;
161 
162  if ((pool = squidaio_get_pool(size)) != NULL) {
163  p = pool->alloc();
164  } else
165  p = xmalloc(size);
166 
167  return p;
168 }
169 
170 static char *
171 squidaio_xstrdup(const char *str)
172 {
173  char *p;
174  int len = strlen(str) + 1;
175 
176  p = (char *)squidaio_xmalloc(len);
177  strncpy(p, str, len);
178 
179  return p;
180 }
181 
182 void
183 squidaio_xfree(void *p, int size)
184 {
185  MemAllocator *pool;
186 
187  if ((pool = squidaio_get_pool(size)) != NULL) {
188  pool->freeOne(p);
189  } else
190  xfree(p);
191 }
192 
193 static void
195 {
196  MemAllocator *pool;
197  int len = strlen(str) + 1;
198 
199  if ((pool = squidaio_get_pool(len)) != NULL) {
200  pool->freeOne(str);
201  } else
202  xfree(str);
203 }
204 
205 void
207 {
208  int i;
209  squidaio_thread_t *threadp;
210 
212  return;
213 
214  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
215  GetCurrentThread(), /* pseudo handle to copy */
216  GetCurrentProcess(), /* pseudo handle, don't close */
217  &main_thread,
218  0, /* required access */
219  FALSE, /* child process's don't inherit the handle */
220  DUPLICATE_SAME_ACCESS)) {
221  /* spit errors */
222  fatal("Couldn't get current thread handle");
223  }
224 
225  /* Initialize request queue */
226  if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
227  FALSE, /* start unowned (as per mutex_init) */
228  NULL) /* no name */
229  ) == NULL) {
230  fatal("Failed to create mutex");
231  }
232 
233  if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
234  FALSE, /* auto signal reset - which I think is pthreads like ? */
235  FALSE, /* start non signaled */
236  NULL) /* no name */
237  ) == NULL) {
238  fatal("Failed to create condition variable");
239  }
240 
242 
244 
246 
248 
249  /* Initialize done queue */
250 
251  if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
252  FALSE, /* start unowned (as per mutex_init) */
253  NULL) /* no name */
254  ) == NULL) {
255  fatal("Failed to create mutex");
256  }
257 
258  if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
259  TRUE, /* manually signaled - which I think is pthreads like ? */
260  FALSE, /* start non signaled */
261  NULL) /* no name */
262  ) == NULL) {
263  fatal("Failed to create condition variable");
264  }
265 
266  done_queue.head = NULL;
267 
269 
270  done_queue.requests = 0;
271 
272  done_queue.blocked = 0;
273 
274  // Initialize the thread I/O pipes before creating any threads
275  // see bug 3189 comment 5 about race conditions.
277 
278  /* Create threads and get them to sit in their wait loop */
279  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
280 
281  assert(NUMTHREADS > 0);
282 
283  for (i = 0; i < NUMTHREADS; ++i) {
285  threadp->status = _THREAD_STARTING;
286  threadp->current_req = NULL;
287  threadp->requests = 0;
288  threadp->next = threads;
289  threads = threadp;
290 
291  if ((threadp->thread = CreateThread(NULL, /* no security attributes */
292  0, /* use default stack size */
293  squidaio_thread_loop, /* thread function */
294  threadp, /* argument to thread function */
295  0, /* use default creation flags */
296  &(threadp->dwThreadId)) /* returns the thread identifier */
297  ) == NULL) {
298  fprintf(stderr, "Thread creation failed\n");
299  threadp->status = _THREAD_FAILED;
300  continue;
301  }
302 
303  /* Set the new thread priority above parent process */
304  SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
305  }
306 
307  /* Create request pool */
308  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
309 
310  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
311 
312  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
313 
314  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
315 
316  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
317 
318  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
319 
321 }
322 
323 void
325 {
326  squidaio_thread_t *threadp;
327  int i;
328  HANDLE * hthreads;
329 
331  return;
332 
333  /* This is the same as in squidaio_sync */
334  do {
336  } while (request_queue_len > 0);
337 
338  hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
339 
340  threadp = threads;
341 
342  for (i = 0; i < NUMTHREADS; ++i) {
343  threadp->exit = 1;
344  hthreads[i] = threadp->thread;
345  threadp = threadp->next;
346  }
347 
348  ReleaseMutex(request_queue.mutex);
349  ResetEvent(request_queue.cond);
350  ReleaseMutex(done_queue.mutex);
351  ResetEvent(done_queue.cond);
352  Sleep(0);
353 
354  WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
355 
356  for (i = 0; i < NUMTHREADS; ++i) {
357  CloseHandle(hthreads[i]);
358  }
359 
360  CloseHandle(main_thread);
362 
364  xfree(hthreads);
365 }
366 
367 static DWORD WINAPI
368 squidaio_thread_loop(LPVOID lpParam)
369 {
370  squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
372  HANDLE cond; /* local copy of the event queue because win32 event handles
373  * don't atomically release the mutex as cond variables do. */
374 
375  /* lock the thread info */
376 
377  if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
378  fatal("Can't get ownership of mutex\n");
379  }
380 
381  /* duplicate the handle */
382  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
383  request_queue.cond, /* handle to copy */
384  GetCurrentProcess(), /* pseudo handle, don't close */
385  &cond,
386  0, /* required access */
387  FALSE, /* child process's don't inherit the handle */
388  DUPLICATE_SAME_ACCESS))
389  fatal("Can't duplicate mutex handle\n");
390 
391  if (!ReleaseMutex(request_queue.mutex)) {
392  CloseHandle(cond);
393  fatal("Can't release mutex\n");
394  }
395 
396  Sleep(0);
397 
398  while (1) {
399  DWORD rv;
400  threadp->current_req = request = NULL;
401  request = NULL;
402  /* Get a request to process */
403  threadp->status = _THREAD_WAITING;
404 
405  if (threadp->exit) {
406  CloseHandle(request_queue.mutex);
407  CloseHandle(cond);
408  return 0;
409  }
410 
411  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
412 
413  if (rv == WAIT_FAILED) {
414  CloseHandle(cond);
415  return 1;
416  }
417 
418  while (!request_queue.head) {
419  if (!ReleaseMutex(request_queue.mutex)) {
420  CloseHandle(cond);
421  threadp->status = _THREAD_FAILED;
422  return 1;
423  }
424 
425  Sleep(0);
426  rv = WaitForSingleObject(cond, INFINITE);
427 
428  if (rv == WAIT_FAILED) {
429  CloseHandle(cond);
430  return 1;
431  }
432 
433  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
434 
435  if (rv == WAIT_FAILED) {
436  CloseHandle(cond);
437  return 1;
438  }
439  }
440 
442 
443  if (request)
444  request_queue.head = request->next;
445 
446  if (!request_queue.head)
448 
449  if (!ReleaseMutex(request_queue.mutex)) {
450  CloseHandle(cond);
451  return 1;
452  }
453 
454  Sleep(0);
455 
456  /* process the request */
457  threadp->status = _THREAD_BUSY;
458 
459  request->next = NULL;
460 
461  threadp->current_req = request;
462 
463  errno = 0;
464 
465  if (!request->cancelled) {
466  switch (request->request_type) {
467 
468  case _AIO_OP_OPEN:
470  break;
471 
472  case _AIO_OP_READ:
474  break;
475 
476  case _AIO_OP_WRITE:
478  break;
479 
480  case _AIO_OP_CLOSE:
482  break;
483 
484  case _AIO_OP_UNLINK:
486  break;
487 
488 #if AIO_OPENDIR /* Opendir not implemented yet */
489 
490  case _AIO_OP_OPENDIR:
491  squidaio_do_opendir(request);
492  break;
493 #endif
494 
495  case _AIO_OP_STAT:
497  break;
498 
499  default:
500  request->ret = -1;
501  request->err = EINVAL;
502  break;
503  }
504  } else { /* cancelled */
505  request->ret = -1;
506  request->err = EINTR;
507  }
508 
509  threadp->status = _THREAD_DONE;
510  /* put the request in the done queue */
511  rv = WaitForSingleObject(done_queue.mutex, INFINITE);
512 
513  if (rv == WAIT_FAILED) {
514  CloseHandle(cond);
515  return 1;
516  }
517 
519  done_queue.tailp = &request->next;
520 
521  if (!ReleaseMutex(done_queue.mutex)) {
522  CloseHandle(cond);
523  return 1;
524  }
525 
527  Sleep(0);
528  ++ threadp->requests;
529  } /* while forever */
530 
531  CloseHandle(cond);
532 
533  return 0;
534 } /* squidaio_thread_loop */
535 
536 static void
538 {
539  static int high_start = 0;
540  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
541  /* Mark it as not executed (failing result, no error) */
542  request->ret = -1;
543  request->err = 0;
544  /* Internal housekeeping */
545  request_queue_len += 1;
546  request->resultp->_data = request;
547  /* Play some tricks with the request_queue2 queue */
548  request->next = NULL;
549 
550  if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
551  if (request_queue2.head) {
552  /* Grab blocked requests */
555  }
556 
557  /* Enqueue request */
559 
560  request_queue.tailp = &request->next;
561 
562  if (!SetEvent(request_queue.cond))
563  fatal("Couldn't push queue");
564 
565  if (!ReleaseMutex(request_queue.mutex)) {
566  /* unexpected error */
567  fatal("Couldn't push queue");
568  }
569 
570  Sleep(0);
571 
572  if (request_queue2.head) {
573  /* Clear queue of blocked requests */
574  request_queue2.head = NULL;
575  request_queue2.tailp = &request_queue2.head;
576  }
577  } else {
578  /* Oops, the request queue is blocked, use request_queue2 */
579  *request_queue2.tailp = request;
580  request_queue2.tailp = &request->next;
581  }
582 
583  if (request_queue2.head) {
584  static uint64_t filter = 0;
585  static uint64_t filter_limit = 8196;
586 
587  if (++filter >= filter_limit) {
588  filter_limit += filter;
589  filter = 0;
590  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
591  }
592  }
593 
594  /* Warn if out of threads */
595  if (request_queue_len > MAGIC1) {
596  static int last_warn = 0;
597  static int queue_high, queue_low;
598 
599  if (high_start == 0) {
600  high_start = (int)squid_curtime;
601  queue_high = request_queue_len;
602  queue_low = request_queue_len;
603  }
604 
605  if (request_queue_len > queue_high)
606  queue_high = request_queue_len;
607 
608  if (request_queue_len < queue_low)
609  queue_low = request_queue_len;
610 
611  if (squid_curtime >= (last_warn + 15) &&
612  squid_curtime >= (high_start + 5)) {
613  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
614 
615  if (squid_curtime >= (high_start + 15))
616  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
617  request_queue_len << ", high=" << queue_high <<
618  ", low=" << queue_low << ", duration=" <<
619  (long int) (squid_curtime - high_start));
620 
621  last_warn = (int)squid_curtime;
622  }
623  } else {
624  high_start = 0;
625  }
626 
627  /* Warn if seriously overloaded */
629  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
630  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
631  squidaio_sync();
632  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
633  }
634 } /* squidaio_queue_request */
635 
636 static void
638 {
639  squidaio_result_t *resultp = requestp->resultp;
640  int cancelled = requestp->cancelled;
641 
642  /* Free allocated structures and copy data back to user space if the */
643  /* request hasn't been cancelled */
644 
645  switch (requestp->request_type) {
646 
647  case _AIO_OP_STAT:
648 
649  if (!cancelled && requestp->ret == 0)
650  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
651 
652  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
653 
654  squidaio_xstrfree(requestp->path);
655 
656  break;
657 
658  case _AIO_OP_OPEN:
659  if (cancelled && requestp->ret >= 0)
660  /* The open() was cancelled but completed */
661  close(requestp->ret);
662 
663  squidaio_xstrfree(requestp->path);
664 
665  break;
666 
667  case _AIO_OP_CLOSE:
668  if (cancelled && requestp->ret < 0)
669  /* The close() was cancelled and never got executed */
670  close(requestp->fd);
671 
672  break;
673 
674  case _AIO_OP_UNLINK:
675 
676  case _AIO_OP_OPENDIR:
677  squidaio_xstrfree(requestp->path);
678 
679  break;
680 
681  case _AIO_OP_READ:
682  break;
683 
684  case _AIO_OP_WRITE:
685  break;
686 
687  default:
688  break;
689  }
690 
691  if (resultp != NULL && !cancelled) {
692  resultp->aio_return = requestp->ret;
693  resultp->aio_errno = requestp->err;
694  }
695 
696  squidaio_request_pool->freeOne(requestp);
697 } /* squidaio_cleanup_request */
698 
699 int
701 {
703 
704  if (request && request->resultp == resultp) {
705  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
706  request->cancelled = 1;
707  request->resultp = NULL;
708  resultp->_data = NULL;
709  resultp->result_type = _AIO_OP_NONE;
710  return 0;
711  }
712 
713  return 1;
714 } /* squidaio_cancel */
715 
716 int
717 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
718 {
719  squidaio_init();
720  squidaio_request_t *requestp;
721 
723 
724  requestp->path = (char *) squidaio_xstrdup(path);
725 
726  requestp->oflag = oflag;
727 
728  requestp->mode = mode;
729 
730  requestp->resultp = resultp;
731 
732  requestp->request_type = _AIO_OP_OPEN;
733 
734  requestp->cancelled = 0;
735 
736  resultp->result_type = _AIO_OP_OPEN;
737 
738  squidaio_queue_request(requestp);
739 
740  return 0;
741 }
742 
743 static void
745 {
746  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
747  requestp->err = errno;
748 }
749 
750 int
751 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
752 {
753  squidaio_request_t *requestp;
754 
756 
757  requestp->fd = fd;
758 
759  requestp->bufferp = bufp;
760 
761  requestp->buflen = bufs;
762 
763  requestp->offset = offset;
764 
765  requestp->whence = whence;
766 
767  requestp->resultp = resultp;
768 
769  requestp->request_type = _AIO_OP_READ;
770 
771  requestp->cancelled = 0;
772 
773  resultp->result_type = _AIO_OP_READ;
774 
775  squidaio_queue_request(requestp);
776 
777  return 0;
778 }
779 
780 static void
782 {
783  lseek(requestp->fd, requestp->offset, requestp->whence);
784 
785  if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
786  requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
787  WIN32_maperror(GetLastError());
788  requestp->ret = -1;
789  }
790 
791  requestp->err = errno;
792 }
793 
794 int
795 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
796 {
797  squidaio_request_t *requestp;
798 
800 
801  requestp->fd = fd;
802 
803  requestp->bufferp = bufp;
804 
805  requestp->buflen = bufs;
806 
807  requestp->offset = offset;
808 
809  requestp->whence = whence;
810 
811  requestp->resultp = resultp;
812 
813  requestp->request_type = _AIO_OP_WRITE;
814 
815  requestp->cancelled = 0;
816 
817  resultp->result_type = _AIO_OP_WRITE;
818 
819  squidaio_queue_request(requestp);
820 
821  return 0;
822 }
823 
824 static void
826 {
827  if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
828  requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
829  WIN32_maperror(GetLastError());
830  requestp->ret = -1;
831  }
832 
833  requestp->err = errno;
834 }
835 
836 int
838 {
839  squidaio_request_t *requestp;
840 
842 
843  requestp->fd = fd;
844 
845  requestp->resultp = resultp;
846 
847  requestp->request_type = _AIO_OP_CLOSE;
848 
849  requestp->cancelled = 0;
850 
851  resultp->result_type = _AIO_OP_CLOSE;
852 
853  squidaio_queue_request(requestp);
854 
855  return 0;
856 }
857 
858 static void
860 {
861  if ((requestp->ret = close(requestp->fd)) < 0) {
862  debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
863  close(requestp->fd);
864  }
865 
866  requestp->err = errno;
867 }
868 
869 int
870 
871 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
872 {
873  squidaio_init();
874  squidaio_request_t *requestp;
875 
877 
878  requestp->path = (char *) squidaio_xstrdup(path);
879 
880  requestp->statp = sb;
881 
882  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
883 
884  requestp->resultp = resultp;
885 
886  requestp->request_type = _AIO_OP_STAT;
887 
888  requestp->cancelled = 0;
889 
890  resultp->result_type = _AIO_OP_STAT;
891 
892  squidaio_queue_request(requestp);
893 
894  return 0;
895 }
896 
897 static void
899 {
900  requestp->ret = stat(requestp->path, requestp->tmpstatp);
901  requestp->err = errno;
902 }
903 
904 int
905 squidaio_unlink(const char *path, squidaio_result_t * resultp)
906 {
907  squidaio_init();
908  squidaio_request_t *requestp;
909 
911 
912  requestp->path = squidaio_xstrdup(path);
913 
914  requestp->resultp = resultp;
915 
916  requestp->request_type = _AIO_OP_UNLINK;
917 
918  requestp->cancelled = 0;
919 
920  resultp->result_type = _AIO_OP_UNLINK;
921 
922  squidaio_queue_request(requestp);
923 
924  return 0;
925 }
926 
927 static void
929 {
930  requestp->ret = unlink(requestp->path);
931  requestp->err = errno;
932 }
933 
934 #if AIO_OPENDIR
935 /* XXX squidaio_opendir NOT implemented yet.. */
936 
937 int
938 squidaio_opendir(const char *path, squidaio_result_t * resultp)
939 {
940  squidaio_request_t *requestp;
941  int len;
942 
943  requestp = squidaio_request_pool->alloc();
944 
945  resultp->result_type = _AIO_OP_OPENDIR;
946 
947  return -1;
948 }
949 
950 static void
951 squidaio_do_opendir(squidaio_request_t * requestp)
952 {
953  /* NOT IMPLEMENTED */
954 }
955 
956 #endif
957 
958 static void
960 {
961  /* kick "overflow" request queue */
962 
963  if (request_queue2.head &&
964  (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
967 
968  if (!SetEvent(request_queue.cond))
969  fatal("couldn't push queue\n");
970 
971  if (!ReleaseMutex(request_queue.mutex)) {
972  /* unexpected error */
973  }
974 
975  Sleep(0);
976  request_queue2.head = NULL;
977  request_queue2.tailp = &request_queue2.head;
978  }
979 
980  /* poll done queue */
981  if (done_queue.head &&
982  (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
983 
984  struct squidaio_request_t *requests = done_queue.head;
985  done_queue.head = NULL;
987 
988  if (!ReleaseMutex(done_queue.mutex)) {
989  /* unexpected error */
990  }
991 
992  Sleep(0);
993  *done_requests.tailp = requests;
994  request_queue_len -= 1;
995 
996  while (requests->next) {
997  requests = requests->next;
998  request_queue_len -= 1;
999  }
1000 
1001  done_requests.tailp = &requests->next;
1002  }
1003 }
1004 
1007 {
1010  int cancelled;
1011  int polled = 0;
1012 
1013 AIO_REPOLL:
1014  request = done_requests.head;
1015 
1016  if (request == NULL && !polled) {
1019  polled = 1;
1020  request = done_requests.head;
1021  }
1022 
1023  if (!request) {
1024  return NULL;
1025  }
1026 
1027  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1028  done_requests.head = request->next;
1029 
1030  if (!done_requests.head)
1031  done_requests.tailp = &done_requests.head;
1032 
1033  resultp = request->resultp;
1034 
1035  cancelled = request->cancelled;
1036 
1038 
1039  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1040 
1042 
1043  if (cancelled)
1044  goto AIO_REPOLL;
1045 
1046  return resultp;
1047 } /* squidaio_poll_done */
1048 
1049 int
1051 {
1052  return request_queue_len + (done_requests.head ? 1 : 0);
1053 }
1054 
1055 int
1057 {
1058  /* XXX This might take a while if the queue is large.. */
1059 
1060  do {
1062  } while (request_queue_len > 0);
1063 
1064  return squidaio_operations_pending();
1065 }
1066 
1067 int
1069 {
1070  return request_queue_len;
1071 }
1072 
1073 static void
1075 {
1076  switch (request->request_type) {
1077 
1078  case _AIO_OP_OPEN:
1079  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1080  break;
1081 
1082  case _AIO_OP_READ:
1083  debugs(43, 5, "READ on fd: " << request->fd);
1084  break;
1085 
1086  case _AIO_OP_WRITE:
1087  debugs(43, 5, "WRITE on fd: " << request->fd);
1088  break;
1089 
1090  case _AIO_OP_CLOSE:
1091  debugs(43, 5, "CLOSE of fd: " << request->fd);
1092  break;
1093 
1094  case _AIO_OP_UNLINK:
1095  debugs(43, 5, "UNLINK of " << request->path);
1096  break;
1097 
1098  default:
1099  break;
1100  }
1101 }
1102 
1103 void
1105 {
1106  squidaio_thread_t *threadp;
1107  int i;
1108 
1109  if (!squidaio_initialised)
1110  return;
1111 
1112  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1113 
1114  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1115 
1116  threadp = threads;
1117 
1118  for (i = 0; i < NUMTHREADS; ++i) {
1119  storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1120  threadp = threadp->next;
1121  }
1122 }
1123 
int squidaio_opendir(const char *, squidaio_result_t *)
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
@ _AIO_OP_READ
Definition: DiskThreads.h:48
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
#define MAGIC1
Definition: DiskThreads.h:34
#define NUMTHREADS
Definition: DiskThreads.h:30
int size
Definition: ModDevPoll.cc:75
time_t squid_curtime
Definition: stub_libtime.cc:20
_squidaio_thread_status
Definition: aiops.cc:39
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:46
static void squidaio_debug(squidaio_request_t *)
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops_win32.cc:871
#define AIO_TINY_BUFS
Definition: aiops_win32.cc:102
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops_win32.cc:905
int squidaio_operations_pending(void)
squidaio_request_t * head
Definition: aiops_win32.cc:117
static squidaio_request_queue_t request_queue
Definition: aiops_win32.cc:114
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops_win32.cc:637
static void squidaio_poll_queues(void)
Definition: aiops_win32.cc:959
#define AIO_MEDIUM_BUFS
Definition: aiops_win32.cc:100
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops_win32.cc:898
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops_win32.cc:717
static squidaio_thread_t * threads
Definition: aiops_win32.cc:96
static MemAllocator * squidaio_small_bufs
Definition: aiops_win32.cc:107
struct squidaio_request_t squidaio_request_t
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops_win32.cc:744
void squidaio_init(void)
Definition: aiops_win32.cc:206
static MemAllocator * squidaio_micro_bufs
Definition: aiops_win32.cc:109
@ _THREAD_BUSY
Definition: aiops_win32.cc:30
@ _THREAD_FAILED
Definition: aiops_win32.cc:31
@ _THREAD_DONE
Definition: aiops_win32.cc:32
@ _THREAD_WAITING
Definition: aiops_win32.cc:29
@ _THREAD_STARTING
Definition: aiops_win32.cc:28
#define RIDICULOUS_LENGTH
Definition: aiops_win32.cc:25
static squidaio_request_queue_t done_queue
Definition: aiops_win32.cc:124
static MemAllocator * squidaio_get_pool(int size)
Definition: aiops_win32.cc:138
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:751
#define AIO_LARGE_BUFS
Definition: aiops_win32.cc:99
static void squidaio_xstrfree(char *str)
Definition: aiops_win32.cc:194
static HANDLE main_thread
Definition: aiops_win32.cc:135
squidaio_result_t * squidaio_poll_done(void)
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops_win32.cc:859
void squidaio_stats(StoreEntry *sentry)
static MemAllocator * squidaio_large_bufs
Definition: aiops_win32.cc:105
static struct @48 done_requests
void * squidaio_xmalloc(int size)
Definition: aiops_win32.cc:157
void squidaio_shutdown(void)
Definition: aiops_win32.cc:324
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops_win32.cc:34
static MemAllocator * squidaio_thread_pool
Definition: aiops_win32.cc:113
static MemAllocator * squidaio_tiny_bufs
Definition: aiops_win32.cc:108
#define AIO_MICRO_BUFS
Definition: aiops_win32.cc:103
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops_win32.cc:537
squidaio_request_t ** tailp
Definition: aiops_win32.cc:117
struct squidaio_request_queue_t squidaio_request_queue_t
static MemAllocator * squidaio_request_pool
Definition: aiops_win32.cc:112
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops_win32.cc:781
static struct @47 request_queue2
static char * squidaio_xstrdup(const char *str)
Definition: aiops_win32.cc:171
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops_win32.cc:837
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:795
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
Definition: aiops_win32.cc:368
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops_win32.cc:825
void squidaio_xfree(void *p, int size)
Definition: aiops_win32.cc:183
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops_win32.cc:928
int squidaio_sync(void)
int squidaio_get_queue_len(void)
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops_win32.cc:700
static int request_queue_len
Definition: aiops_win32.cc:111
static int squidaio_initialised
Definition: aiops_win32.cc:97
#define AIO_SMALL_BUFS
Definition: aiops_win32.cc:101
static MemAllocator * squidaio_medium_bufs
Definition: aiops_win32.cc:106
#define assert(EX)
Definition: assert.h:19
static void NotifyIOCompleted()
Definition: CommIO.h:36
static void NotifyIOClose()
Definition: CommIO.cc:38
static void ResetNotifications()
Definition: CommIO.cc:69
static void Initialize()
Definition: CommIO.cc:19
virtual void freeOne(void *)=0
virtual void * alloc()=0
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
#define DBG_IMPORTANT
Definition: Stream.h:41
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:196
#define DBG_CRITICAL
Definition: Stream.h:40
void fatal(const char *message)
Definition: fatal.cc:28
#define memPoolCreate
Definition: Pool.h:325
#define xfree
#define xmalloc
static struct stat sb
Definition: squidclient.cc:71
#define TRUE
Definition: std-includes.h:55
#define FALSE
Definition: std-includes.h:56
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:830
unsigned long blocked
Definition: aiops.cc:76
squidaio_request_t *volatile head
Definition: aiops.cc:73
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:74
pthread_cond_t cond
Definition: aiops.cc:72
pthread_mutex_t mutex
Definition: aiops.cc:71
unsigned long requests
Definition: aiops.cc:75
squidaio_result_t * resultp
Definition: aiops.cc:67
struct stat * tmpstatp
Definition: aiops.cc:64
struct squidaio_request_t * next
Definition: aiops.cc:50
struct stat * statp
Definition: aiops.cc:66
size_t buflen
Definition: aiops.cc:58
squidaio_request_type request_type
Definition: aiops.cc:51
char * bufferp
Definition: aiops.cc:57
squidaio_thread_t * next
Definition: aiops.cc:82
pthread_t thread
Definition: aiops.cc:83
struct squidaio_request_t * current_req
Definition: aiops.cc:86
int volatile exit
Definition: aiops_win32.cc:78
squidaio_thread_status status
Definition: aiops.cc:84
unsigned long requests
Definition: aiops.cc:87
int unsigned int
Definition: stub_fd.cc:19
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
#define NULL
Definition: types.h:166
unsigned short mode_t
Definition: types.h:150
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors