aiops_win32.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 Windows AIOPS */
10 
11 #include "squid.h"
12 #include "compat/win32_maperror.h"
14 #include "DiskThreads.h"
15 #include "fd.h"
16 #include "mem/Allocator.h"
17 #include "mem/Pool.h"
18 #include "SquidConfig.h"
19 #include "Store.h"
20 
21 #include <cerrno>
22 #include <csignal>
23 #include <sys/stat.h>
24 #include <fcntl.h>
25 #include <dirent.h>
26 
27 #define RIDICULOUS_LENGTH 4096
28 
35 };
37 
38 typedef struct squidaio_request_t {
39 
40  struct squidaio_request_t *next;
42  int cancelled;
43  char *path;
44  int oflag;
45  mode_t mode;
46  int fd;
47  char *bufferp;
48  char *tmpbufp;
49  size_t buflen;
50  off_t offset;
51  int whence;
52  int ret;
53  int err;
54 
55  struct stat *tmpstatp;
56 
57  struct stat *statp;
60 
61 typedef struct squidaio_request_queue_t {
62  HANDLE mutex;
63  HANDLE cond; /* See Event objects */
64  squidaio_request_t *volatile head;
65  squidaio_request_t *volatile *volatile tailp;
66  unsigned long requests;
67  unsigned long blocked; /* main failed to lock the queue */
69 
71 
72 struct squidaio_thread_t {
74  HANDLE thread;
75  DWORD dwThreadId; /* thread ID */
77 
79  unsigned long requests;
80  int volatile exit;
81 };
82 
85 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
92 #if AIO_OPENDIR
93 static void *squidaio_do_opendir(squidaio_request_t *);
94 #endif
95 static void squidaio_debug(squidaio_request_t *);
96 static void squidaio_poll_queues(void);
97 
98 static squidaio_thread_t *threads = nullptr;
99 static int squidaio_initialised = 0;
100 
101 #define AIO_LARGE_BUFS 16384
102 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
103 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
104 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
105 #define AIO_MICRO_BUFS 128
106 
107 static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
108 static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
109 static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
110 static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
111 static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
112 
113 static size_t request_queue_len = 0;
117 
118 static struct {
120 }
121 
122 request_queue2 = {
123 
124  nullptr, &request_queue2.head
125 };
127 
128 static struct {
130 }
131 
132 done_requests = {
133 
134  nullptr, &done_requests.head
135 };
136 
137 static HANDLE main_thread;
138 
139 static Mem::Allocator *
141 {
142  if (size <= AIO_LARGE_BUFS) {
143  if (size <= AIO_MICRO_BUFS)
144  return squidaio_micro_bufs;
145  else if (size <= AIO_TINY_BUFS)
146  return squidaio_tiny_bufs;
147  else if (size <= AIO_SMALL_BUFS)
148  return squidaio_small_bufs;
149  else if (size <= AIO_MEDIUM_BUFS)
150  return squidaio_medium_bufs;
151  else
152  return squidaio_large_bufs;
153  }
154 
155  return nullptr;
156 }
157 
158 void *
160 {
161  void *p;
162  if (const auto pool = squidaio_get_pool(size)) {
163  p = pool->alloc();
164  } else
165  p = xmalloc(size);
166 
167  return p;
168 }
169 
170 static char *
171 squidaio_xstrdup(const char *str)
172 {
173  char *p;
174  int len = strlen(str) + 1;
175 
176  p = (char *)squidaio_xmalloc(len);
177  strncpy(p, str, len);
178 
179  return p;
180 }
181 
182 void
183 squidaio_xfree(void *p, int size)
184 {
185  if (const auto pool = squidaio_get_pool(size)) {
186  pool->freeOne(p);
187  } else
188  xfree(p);
189 }
190 
191 static void
193 {
194  int len = strlen(str) + 1;
195 
196  if (const auto pool = squidaio_get_pool(len)) {
197  pool->freeOne(str);
198  } else
199  xfree(str);
200 }
201 
202 void
204 {
205  squidaio_thread_t *threadp;
206 
208  return;
209 
210  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
211  GetCurrentThread(), /* pseudo handle to copy */
212  GetCurrentProcess(), /* pseudo handle, don't close */
213  &main_thread,
214  0, /* required access */
215  FALSE, /* child process's don't inherit the handle */
216  DUPLICATE_SAME_ACCESS)) {
217  /* spit errors */
218  fatal("Couldn't get current thread handle");
219  }
220 
221  /* Initialize request queue */
222  if ((request_queue.mutex = CreateMutex(nullptr, /* no inheritance */
223  FALSE, /* start unowned (as per mutex_init) */
224  nullptr) /* no name */
225  ) == NULL) {
226  fatal("Failed to create mutex");
227  }
228 
229  if ((request_queue.cond = CreateEvent(nullptr, /* no inheritance */
230  FALSE, /* auto signal reset - which I think is pthreads like ? */
231  FALSE, /* start non signaled */
232  nullptr) /* no name */
233  ) == NULL) {
234  fatal("Failed to create condition variable");
235  }
236 
237  request_queue.head = nullptr;
238 
240 
242 
244 
245  /* Initialize done queue */
246 
247  if ((done_queue.mutex = CreateMutex(nullptr, /* no inheritance */
248  FALSE, /* start unowned (as per mutex_init) */
249  nullptr) /* no name */
250  ) == NULL) {
251  fatal("Failed to create mutex");
252  }
253 
254  if ((done_queue.cond = CreateEvent(nullptr, /* no inheritance */
255  TRUE, /* manually signaled - which I think is pthreads like ? */
256  FALSE, /* start non signaled */
257  nullptr) /* no name */
258  ) == NULL) {
259  fatal("Failed to create condition variable");
260  }
261 
262  done_queue.head = nullptr;
263 
265 
266  done_queue.requests = 0;
267 
268  done_queue.blocked = 0;
269 
270  // Initialize the thread I/O pipes before creating any threads
271  // see bug 3189 comment 5 about race conditions.
273 
274  /* Create threads and get them to sit in their wait loop */
275  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
276 
277  assert(NUMTHREADS > 0);
278 
279  for (size_t i = 0; i < NUMTHREADS; ++i) {
281  threadp->status = _THREAD_STARTING;
282  threadp->current_req = nullptr;
283  threadp->requests = 0;
284  threadp->next = threads;
285  threads = threadp;
286 
287  if ((threadp->thread = CreateThread(nullptr, /* no security attributes */
288  0, /* use default stack size */
289  squidaio_thread_loop, /* thread function */
290  threadp, /* argument to thread function */
291  0, /* use default creation flags */
292  &(threadp->dwThreadId)) /* returns the thread identifier */
293  ) == NULL) {
294  fprintf(stderr, "Thread creation failed\n");
295  threadp->status = _THREAD_FAILED;
296  continue;
297  }
298 
299  /* Set the new thread priority above parent process */
300  SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
301  }
302 
303  /* Create request pool */
304  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
305 
306  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
307 
308  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
309 
310  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
311 
312  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
313 
314  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
315 
317 }
318 
319 void
321 {
322  squidaio_thread_t *threadp;
323  HANDLE * hthreads;
324 
326  return;
327 
328  /* This is the same as in squidaio_sync */
329  do {
331  } while (request_queue_len > 0);
332 
333  hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
334 
335  threadp = threads;
336 
337  for (size_t i = 0; i < NUMTHREADS; ++i) {
338  threadp->exit = 1;
339  hthreads[i] = threadp->thread;
340  threadp = threadp->next;
341  }
342 
343  ReleaseMutex(request_queue.mutex);
344  ResetEvent(request_queue.cond);
345  ReleaseMutex(done_queue.mutex);
346  ResetEvent(done_queue.cond);
347  Sleep(0);
348 
349  WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
350 
351  for (size_t i = 0; i < NUMTHREADS; ++i) {
352  CloseHandle(hthreads[i]);
353  }
354 
355  CloseHandle(main_thread);
357 
359  xfree(hthreads);
360 }
361 
362 static DWORD WINAPI
363 squidaio_thread_loop(LPVOID lpParam)
364 {
365  squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
366  squidaio_request_t *request;
367  HANDLE cond; /* local copy of the event queue because win32 event handles
368  * don't atomically release the mutex as cond variables do. */
369 
370  /* lock the thread info */
371 
372  if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
373  fatal("Can't get ownership of mutex\n");
374  }
375 
376  /* duplicate the handle */
377  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
378  request_queue.cond, /* handle to copy */
379  GetCurrentProcess(), /* pseudo handle, don't close */
380  &cond,
381  0, /* required access */
382  FALSE, /* child process's don't inherit the handle */
383  DUPLICATE_SAME_ACCESS))
384  fatal("Can't duplicate mutex handle\n");
385 
386  if (!ReleaseMutex(request_queue.mutex)) {
387  CloseHandle(cond);
388  fatal("Can't release mutex\n");
389  }
390 
391  Sleep(0);
392 
393  while (1) {
394  DWORD rv;
395  threadp->current_req = request = nullptr;
396  request = nullptr;
397  /* Get a request to process */
398  threadp->status = _THREAD_WAITING;
399 
400  if (threadp->exit) {
401  CloseHandle(request_queue.mutex);
402  CloseHandle(cond);
403  return 0;
404  }
405 
406  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
407 
408  if (rv == WAIT_FAILED) {
409  CloseHandle(cond);
410  return 1;
411  }
412 
413  while (!request_queue.head) {
414  if (!ReleaseMutex(request_queue.mutex)) {
415  CloseHandle(cond);
416  threadp->status = _THREAD_FAILED;
417  return 1;
418  }
419 
420  Sleep(0);
421  rv = WaitForSingleObject(cond, INFINITE);
422 
423  if (rv == WAIT_FAILED) {
424  CloseHandle(cond);
425  return 1;
426  }
427 
428  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
429 
430  if (rv == WAIT_FAILED) {
431  CloseHandle(cond);
432  return 1;
433  }
434  }
435 
436  request = request_queue.head;
437 
438  if (request)
439  request_queue.head = request->next;
440 
441  if (!request_queue.head)
443 
444  if (!ReleaseMutex(request_queue.mutex)) {
445  CloseHandle(cond);
446  return 1;
447  }
448 
449  Sleep(0);
450 
451  /* process the request */
452  threadp->status = _THREAD_BUSY;
453 
454  request->next = nullptr;
455 
456  threadp->current_req = request;
457 
458  errno = 0;
459 
460  if (!request->cancelled) {
461  switch (request->request_type) {
462 
463  case _AIO_OP_OPEN:
464  squidaio_do_open(request);
465  break;
466 
467  case _AIO_OP_READ:
468  squidaio_do_read(request);
469  break;
470 
471  case _AIO_OP_WRITE:
472  squidaio_do_write(request);
473  break;
474 
475  case _AIO_OP_CLOSE:
476  squidaio_do_close(request);
477  break;
478 
479  case _AIO_OP_UNLINK:
480  squidaio_do_unlink(request);
481  break;
482 
483 #if AIO_OPENDIR /* Opendir not implemented yet */
484 
485  case _AIO_OP_OPENDIR:
486  squidaio_do_opendir(request);
487  break;
488 #endif
489 
490  case _AIO_OP_STAT:
491  squidaio_do_stat(request);
492  break;
493 
494  default:
495  request->ret = -1;
496  request->err = EINVAL;
497  break;
498  }
499  } else { /* cancelled */
500  request->ret = -1;
501  request->err = EINTR;
502  }
503 
504  threadp->status = _THREAD_DONE;
505  /* put the request in the done queue */
506  rv = WaitForSingleObject(done_queue.mutex, INFINITE);
507 
508  if (rv == WAIT_FAILED) {
509  CloseHandle(cond);
510  return 1;
511  }
512 
513  *done_queue.tailp = request;
514  done_queue.tailp = &request->next;
515 
516  if (!ReleaseMutex(done_queue.mutex)) {
517  CloseHandle(cond);
518  return 1;
519  }
520 
522  Sleep(0);
523  ++ threadp->requests;
524  } /* while forever */
525 
526  CloseHandle(cond);
527 
528  return 0;
529 } /* squidaio_thread_loop */
530 
531 static void
533 {
534  static int high_start = 0;
535  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
536  /* Mark it as not executed (failing result, no error) */
537  request->ret = -1;
538  request->err = 0;
539  /* Internal housekeeping */
540  request_queue_len += 1;
541  request->resultp->_data = request;
542  /* Play some tricks with the request_queue2 queue */
543  request->next = nullptr;
544 
545  if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
546  if (request_queue2.head) {
547  /* Grab blocked requests */
550  }
551 
552  /* Enqueue request */
553  *request_queue.tailp = request;
554 
555  request_queue.tailp = &request->next;
556 
557  if (!SetEvent(request_queue.cond))
558  fatal("Couldn't push queue");
559 
560  if (!ReleaseMutex(request_queue.mutex)) {
561  /* unexpected error */
562  fatal("Couldn't push queue");
563  }
564 
565  Sleep(0);
566 
567  if (request_queue2.head) {
568  /* Clear queue of blocked requests */
569  request_queue2.head = nullptr;
570  request_queue2.tailp = &request_queue2.head;
571  }
572  } else {
573  /* Oops, the request queue is blocked, use request_queue2 */
574  *request_queue2.tailp = request;
575  request_queue2.tailp = &request->next;
576  }
577 
578  if (request_queue2.head) {
579  static uint64_t filter = 0;
580  static uint64_t filter_limit = 8196;
581 
582  if (++filter >= filter_limit) {
583  filter_limit += filter;
584  filter = 0;
585  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
586  }
587  }
588 
589  /* Warn if out of threads */
590  if (request_queue_len > MAGIC1) {
591  static int last_warn = 0;
592  static size_t queue_high, queue_low;
593 
594  if (high_start == 0) {
595  high_start = (int)squid_curtime;
596  queue_high = request_queue_len;
597  queue_low = request_queue_len;
598  }
599 
600  if (request_queue_len > queue_high)
601  queue_high = request_queue_len;
602 
603  if (request_queue_len < queue_low)
604  queue_low = request_queue_len;
605 
606  if (squid_curtime >= (last_warn + 15) &&
607  squid_curtime >= (high_start + 5)) {
608  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
609 
610  if (squid_curtime >= (high_start + 15))
611  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
612  request_queue_len << ", high=" << queue_high <<
613  ", low=" << queue_low << ", duration=" <<
614  (long int) (squid_curtime - high_start));
615 
616  last_warn = (int)squid_curtime;
617  }
618  } else {
619  high_start = 0;
620  }
621 
622  /* Warn if seriously overloaded */
624  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
625  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
626  squidaio_sync();
627  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
628  }
629 } /* squidaio_queue_request */
630 
631 static void
633 {
634  squidaio_result_t *resultp = requestp->resultp;
635  int cancelled = requestp->cancelled;
636 
637  /* Free allocated structures and copy data back to user space if the */
638  /* request hasn't been cancelled */
639 
640  switch (requestp->request_type) {
641 
642  case _AIO_OP_STAT:
643 
644  if (!cancelled && requestp->ret == 0)
645  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
646 
647  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
648 
649  squidaio_xstrfree(requestp->path);
650 
651  break;
652 
653  case _AIO_OP_OPEN:
654  if (cancelled && requestp->ret >= 0)
655  /* The open() was cancelled but completed */
656  close(requestp->ret);
657 
658  squidaio_xstrfree(requestp->path);
659 
660  break;
661 
662  case _AIO_OP_CLOSE:
663  if (cancelled && requestp->ret < 0)
664  /* The close() was cancelled and never got executed */
665  close(requestp->fd);
666 
667  break;
668 
669  case _AIO_OP_UNLINK:
670 
671  case _AIO_OP_OPENDIR:
672  squidaio_xstrfree(requestp->path);
673 
674  break;
675 
676  case _AIO_OP_READ:
677  break;
678 
679  case _AIO_OP_WRITE:
680  break;
681 
682  default:
683  break;
684  }
685 
686  if (resultp != NULL && !cancelled) {
687  resultp->aio_return = requestp->ret;
688  resultp->aio_errno = requestp->err;
689  }
690 
691  squidaio_request_pool->freeOne(requestp);
692 } /* squidaio_cleanup_request */
693 
694 int
696 {
697  squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
698 
699  if (request && request->resultp == resultp) {
700  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
701  request->cancelled = 1;
702  request->resultp = nullptr;
703  resultp->_data = nullptr;
704  resultp->result_type = _AIO_OP_NONE;
705  return 0;
706  }
707 
708  return 1;
709 } /* squidaio_cancel */
710 
711 int
712 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
713 {
714  squidaio_init();
715  squidaio_request_t *requestp;
716 
718 
719  requestp->path = (char *) squidaio_xstrdup(path);
720 
721  requestp->oflag = oflag;
722 
723  requestp->mode = mode;
724 
725  requestp->resultp = resultp;
726 
727  requestp->request_type = _AIO_OP_OPEN;
728 
729  requestp->cancelled = 0;
730 
731  resultp->result_type = _AIO_OP_OPEN;
732 
733  squidaio_queue_request(requestp);
734 
735  return 0;
736 }
737 
738 static void
740 {
741  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
742  requestp->err = errno;
743 }
744 
745 int
746 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
747 {
748  squidaio_request_t *requestp;
749 
751 
752  requestp->fd = fd;
753 
754  requestp->bufferp = bufp;
755 
756  requestp->buflen = bufs;
757 
758  requestp->offset = offset;
759 
760  requestp->whence = whence;
761 
762  requestp->resultp = resultp;
763 
764  requestp->request_type = _AIO_OP_READ;
765 
766  requestp->cancelled = 0;
767 
768  resultp->result_type = _AIO_OP_READ;
769 
770  squidaio_queue_request(requestp);
771 
772  return 0;
773 }
774 
775 static void
777 {
778  lseek(requestp->fd, requestp->offset, requestp->whence);
779 
780  if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
781  requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
782  WIN32_maperror(GetLastError());
783  requestp->ret = -1;
784  }
785 
786  requestp->err = errno;
787 }
788 
789 int
790 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
791 {
792  squidaio_request_t *requestp;
793 
795 
796  requestp->fd = fd;
797 
798  requestp->bufferp = bufp;
799 
800  requestp->buflen = bufs;
801 
802  requestp->offset = offset;
803 
804  requestp->whence = whence;
805 
806  requestp->resultp = resultp;
807 
808  requestp->request_type = _AIO_OP_WRITE;
809 
810  requestp->cancelled = 0;
811 
812  resultp->result_type = _AIO_OP_WRITE;
813 
814  squidaio_queue_request(requestp);
815 
816  return 0;
817 }
818 
819 static void
821 {
822  if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
823  requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
824  WIN32_maperror(GetLastError());
825  requestp->ret = -1;
826  }
827 
828  requestp->err = errno;
829 }
830 
831 int
833 {
834  squidaio_request_t *requestp;
835 
837 
838  requestp->fd = fd;
839 
840  requestp->resultp = resultp;
841 
842  requestp->request_type = _AIO_OP_CLOSE;
843 
844  requestp->cancelled = 0;
845 
846  resultp->result_type = _AIO_OP_CLOSE;
847 
848  squidaio_queue_request(requestp);
849 
850  return 0;
851 }
852 
853 static void
855 {
856  if ((requestp->ret = close(requestp->fd)) < 0) {
857  debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
858  close(requestp->fd);
859  }
860 
861  requestp->err = errno;
862 }
863 
864 int
865 
866 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
867 {
868  squidaio_init();
869  squidaio_request_t *requestp;
870 
872 
873  requestp->path = (char *) squidaio_xstrdup(path);
874 
875  requestp->statp = sb;
876 
877  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
878 
879  requestp->resultp = resultp;
880 
881  requestp->request_type = _AIO_OP_STAT;
882 
883  requestp->cancelled = 0;
884 
885  resultp->result_type = _AIO_OP_STAT;
886 
887  squidaio_queue_request(requestp);
888 
889  return 0;
890 }
891 
892 static void
894 {
895  requestp->ret = stat(requestp->path, requestp->tmpstatp);
896  requestp->err = errno;
897 }
898 
899 int
900 squidaio_unlink(const char *path, squidaio_result_t * resultp)
901 {
902  squidaio_init();
903  squidaio_request_t *requestp;
904 
906 
907  requestp->path = squidaio_xstrdup(path);
908 
909  requestp->resultp = resultp;
910 
911  requestp->request_type = _AIO_OP_UNLINK;
912 
913  requestp->cancelled = 0;
914 
915  resultp->result_type = _AIO_OP_UNLINK;
916 
917  squidaio_queue_request(requestp);
918 
919  return 0;
920 }
921 
922 static void
924 {
925  requestp->ret = unlink(requestp->path);
926  requestp->err = errno;
927 }
928 
929 #if AIO_OPENDIR
930 /* XXX squidaio_opendir NOT implemented yet.. */
931 
932 int
933 squidaio_opendir(const char *path, squidaio_result_t * resultp)
934 {
935  squidaio_request_t *requestp;
936  int len;
937 
938  requestp = squidaio_request_pool->alloc();
939 
940  resultp->result_type = _AIO_OP_OPENDIR;
941 
942  return -1;
943 }
944 
945 static void
946 squidaio_do_opendir(squidaio_request_t * requestp)
947 {
948  /* NOT IMPLEMENTED */
949 }
950 
951 #endif
952 
953 static void
955 {
956  /* kick "overflow" request queue */
957 
958  if (request_queue2.head &&
959  (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
962 
963  if (!SetEvent(request_queue.cond))
964  fatal("couldn't push queue\n");
965 
966  if (!ReleaseMutex(request_queue.mutex)) {
967  /* unexpected error */
968  }
969 
970  Sleep(0);
971  request_queue2.head = nullptr;
972  request_queue2.tailp = &request_queue2.head;
973  }
974 
975  /* poll done queue */
976  if (done_queue.head &&
977  (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
978 
979  struct squidaio_request_t *requests = done_queue.head;
980  done_queue.head = nullptr;
982 
983  if (!ReleaseMutex(done_queue.mutex)) {
984  /* unexpected error */
985  }
986 
987  Sleep(0);
988  *done_requests.tailp = requests;
989  request_queue_len -= 1;
990 
991  while (requests->next) {
992  requests = requests->next;
993  request_queue_len -= 1;
994  }
995 
996  done_requests.tailp = &requests->next;
997  }
998 }
999 
1002 {
1003  squidaio_request_t *request;
1005  int cancelled;
1006  int polled = 0;
1007 
1008 AIO_REPOLL:
1009  request = done_requests.head;
1010 
1011  if (request == NULL && !polled) {
1014  polled = 1;
1015  request = done_requests.head;
1016  }
1017 
1018  if (!request) {
1019  return nullptr;
1020  }
1021 
1022  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1023  done_requests.head = request->next;
1024 
1025  if (!done_requests.head)
1026  done_requests.tailp = &done_requests.head;
1027 
1028  resultp = request->resultp;
1029 
1030  cancelled = request->cancelled;
1031 
1032  squidaio_debug(request);
1033 
1034  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1035 
1036  squidaio_cleanup_request(request);
1037 
1038  if (cancelled)
1039  goto AIO_REPOLL;
1040 
1041  return resultp;
1042 } /* squidaio_poll_done */
1043 
1044 int
1046 {
1047  return request_queue_len + (done_requests.head ? 1 : 0);
1048 }
1049 
1050 int
1052 {
1053  /* XXX This might take a while if the queue is large.. */
1054 
1055  do {
1057  } while (request_queue_len > 0);
1058 
1059  return squidaio_operations_pending();
1060 }
1061 
1062 int
1064 {
1065  return request_queue_len;
1066 }
1067 
1068 static void
1070 {
1071  switch (request->request_type) {
1072 
1073  case _AIO_OP_OPEN:
1074  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1075  break;
1076 
1077  case _AIO_OP_READ:
1078  debugs(43, 5, "READ on fd: " << request->fd);
1079  break;
1080 
1081  case _AIO_OP_WRITE:
1082  debugs(43, 5, "WRITE on fd: " << request->fd);
1083  break;
1084 
1085  case _AIO_OP_CLOSE:
1086  debugs(43, 5, "CLOSE of fd: " << request->fd);
1087  break;
1088 
1089  case _AIO_OP_UNLINK:
1090  debugs(43, 5, "UNLINK of " << request->path);
1091  break;
1092 
1093  default:
1094  break;
1095  }
1096 }
1097 
1098 void
1100 {
1101  squidaio_thread_t *threadp;
1102 
1103  if (!squidaio_initialised)
1104  return;
1105 
1106  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1107 
1108  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1109 
1110  threadp = threads;
1111 
1112  for (size_t i = 0; i < NUMTHREADS; ++i) {
1113  storeAppendPrintf(sentry, "%zu\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1114  threadp = threadp->next;
1115  }
1116 }
1117 
static HANDLE main_thread
Definition: aiops_win32.cc:137
void fatal(const char *message)
Definition: fatal.cc:28
static void squidaio_poll_queues(void)
Definition: aiops_win32.cc:954
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops_win32.cc:854
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops_win32.cc:820
static squidaio_request_queue_t request_queue
Definition: aiops_win32.cc:116
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71
int squidaio_operations_pending(void)
#define AIO_TINY_BUFS
Definition: aiops_win32.cc:104
#define DBG_CRITICAL
Definition: Stream.h:37
#define xmalloc
static Mem::Allocator * squidaio_large_bufs
Definition: aiops_win32.cc:107
squidaio_result_t * resultp
Definition: aiops.cc:68
struct squidaio_request_t * next
Definition: aiops.cc:51
void squidaio_stats(StoreEntry *sentry)
struct squidaio_request_t squidaio_request_t
unsigned long requests
Definition: aiops.cc:88
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
#define FALSE
Definition: std-includes.h:56
#define AIO_SMALL_BUFS
Definition: aiops_win32.cc:103
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops_win32.cc:532
#define NUMTHREADS
Definition: DiskThreads.h:30
static void NotifyIOCompleted()
Definition: CommIO.h:36
static int squidaio_initialised
Definition: aiops_win32.cc:99
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
#define AIO_MEDIUM_BUFS
Definition: aiops_win32.cc:102
static void squidaio_debug(squidaio_request_t *)
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops_win32.cc:632
unsigned long requests
Definition: aiops.cc:76
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition: Allocator.h:44
_squidaio_thread_status
Definition: aiops.cc:40
pthread_mutex_t mutex
Definition: aiops.cc:72
void squidaio_shutdown(void)
Definition: aiops_win32.cc:320
static char * squidaio_xstrdup(const char *str)
Definition: aiops_win32.cc:171
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
static Mem::Allocator * squidaio_small_bufs
Definition: aiops_win32.cc:109
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops_win32.cc:695
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops_win32.cc:893
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
static void ResetNotifications()
Definition: CommIO.cc:70
squidaio_request_t ** tailp
Definition: aiops_win32.cc:119
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops_win32.cc:900
struct stat * statp
Definition: aiops.cc:67
@ _THREAD_DONE
Definition: aiops_win32.cc:34
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops_win32.cc:832
#define AIO_LARGE_BUFS
Definition: aiops_win32.cc:101
squidaio_request_type request_type
Definition: aiops.cc:52
pthread_t thread
Definition: aiops.cc:84
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops_win32.cc:739
static void squidaio_xstrfree(char *str)
Definition: aiops_win32.cc:192
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops_win32.cc:866
void squidaio_init(void)
Definition: aiops_win32.cc:203
static struct @45 done_requests
@ _AIO_OP_READ
Definition: DiskThreads.h:48
static void NotifyIOClose()
Definition: CommIO.cc:39
int size
Definition: ModDevPoll.cc:69
void freeOne(void *obj)
return memory reserved by alloc()
Definition: Allocator.h:51
#define NULL
Definition: types.h:145
@ _THREAD_WAITING
Definition: aiops_win32.cc:31
static Mem::Allocator * squidaio_medium_bufs
Definition: aiops_win32.cc:108
int squidaio_sync(void)
#define MAGIC1
Definition: DiskThreads.h:34
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops_win32.cc:776
static Mem::Allocator * squidaio_thread_pool
Definition: aiops_win32.cc:115
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:746
static squidaio_request_queue_t done_queue
Definition: aiops_win32.cc:126
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition: Pool.h:123
squidaio_thread_status status
Definition: aiops.cc:85
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
Definition: aiops_win32.cc:363
@ _THREAD_STARTING
Definition: aiops_win32.cc:30
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:47
#define assert(EX)
Definition: assert.h:17
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
squidaio_request_t * head
Definition: aiops_win32.cc:119
static size_t request_queue_len
Definition: aiops_win32.cc:113
time_t squid_curtime
Definition: stub_libtime.cc:20
@ _THREAD_FAILED
Definition: aiops_win32.cc:33
#define xfree
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
int squidaio_get_queue_len(void)
#define AIO_MICRO_BUFS
Definition: aiops_win32.cc:105
#define TRUE
Definition: std-includes.h:55
static Mem::Allocator * squidaio_request_pool
Definition: aiops_win32.cc:114
squidaio_thread_t * next
Definition: aiops.cc:83
squidaio_request_t *volatile head
Definition: aiops.cc:74
size_t buflen
Definition: aiops.cc:59
int volatile exit
Definition: aiops_win32.cc:80
static Mem::Allocator * squidaio_tiny_bufs
Definition: aiops_win32.cc:110
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:790
static void Initialize()
Definition: CommIO.cc:20
static struct @44 request_queue2
void squidaio_xfree(void *p, int size)
Definition: aiops_win32.cc:183
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:75
unsigned short mode_t
Definition: types.h:129
#define DBG_IMPORTANT
Definition: Stream.h:38
int squidaio_opendir(const char *, squidaio_result_t *)
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
@ _THREAD_BUSY
Definition: aiops_win32.cc:32
static Mem::Allocator * squidaio_get_pool(int size)
Definition: aiops_win32.cc:140
struct squidaio_request_t * current_req
Definition: aiops.cc:87
pthread_cond_t cond
Definition: aiops.cc:73
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops_win32.cc:923
unsigned long blocked
Definition: aiops.cc:77
struct squidaio_request_queue_t squidaio_request_queue_t
#define RIDICULOUS_LENGTH
Definition: aiops_win32.cc:27
char * bufferp
Definition: aiops.cc:58
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
void * squidaio_xmalloc(int size)
Definition: aiops_win32.cc:159
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
static squidaio_thread_t * threads
Definition: aiops_win32.cc:98
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops_win32.cc:712
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops_win32.cc:36
static Mem::Allocator * squidaio_micro_bufs
Definition: aiops_win32.cc:111
int unsigned int
Definition: stub_fd.cc:19
squidaio_result_t * squidaio_poll_done(void)
struct stat * tmpstatp
Definition: aiops.cc:65

 

Introduction

Documentation

Support

Miscellaneous