aiops_win32.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 Windows AIOPS */
10 
11 #include "squid.h"
13 #include "DiskThreads.h"
14 #include "fd.h"
15 #include "mem/Pool.h"
16 #include "SquidConfig.h"
17 #include "SquidTime.h"
18 #include "Store.h"
19 
20 #include <cerrno>
21 #include <csignal>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <dirent.h>
25 
26 #define RIDICULOUS_LENGTH 4096
27 
34 };
36 
37 typedef struct squidaio_request_t {
38 
39  struct squidaio_request_t *next;
41  int cancelled;
42  char *path;
43  int oflag;
44  mode_t mode;
45  int fd;
46  char *bufferp;
47  char *tmpbufp;
48  size_t buflen;
49  off_t offset;
50  int whence;
51  int ret;
52  int err;
53 
54  struct stat *tmpstatp;
55 
56  struct stat *statp;
59 
60 typedef struct squidaio_request_queue_t {
61  HANDLE mutex;
62  HANDLE cond; /* See Event objects */
63  squidaio_request_t *volatile head;
64  squidaio_request_t *volatile *volatile tailp;
65  unsigned long requests;
66  unsigned long blocked; /* main failed to lock the queue */
68 
70 
71 struct squidaio_thread_t {
73  HANDLE thread;
74  DWORD dwThreadId; /* thread ID */
75  squidaio_thread_status status;
76 
78  unsigned long requests;
79  int volatile exit;
80 };
81 
84 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
91 #if AIO_OPENDIR
92 static void *squidaio_do_opendir(squidaio_request_t *);
93 #endif
94 static void squidaio_debug(squidaio_request_t *);
95 static void squidaio_poll_queues(void);
96 
97 static squidaio_thread_t *threads = NULL;
98 static int squidaio_initialised = 0;
99 
100 #define AIO_LARGE_BUFS 16384
101 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
102 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
103 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
104 #define AIO_MICRO_BUFS 128
105 
106 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
107 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
108 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
109 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
110 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
111 
112 static int request_queue_len = 0;
113 static MemAllocator *squidaio_request_pool = NULL;
114 static MemAllocator *squidaio_thread_pool = NULL;
116 
117 static struct {
119 }
120 
121 request_queue2 = {
122 
123  NULL, &request_queue2.head
124 };
126 
127 static struct {
129 }
130 
131 done_requests = {
132 
133  NULL, &done_requests.head
134 };
135 
136 static HANDLE main_thread;
137 
138 static MemAllocator *
140 {
141  if (size <= AIO_LARGE_BUFS) {
142  if (size <= AIO_MICRO_BUFS)
143  return squidaio_micro_bufs;
144  else if (size <= AIO_TINY_BUFS)
145  return squidaio_tiny_bufs;
146  else if (size <= AIO_SMALL_BUFS)
147  return squidaio_small_bufs;
148  else if (size <= AIO_MEDIUM_BUFS)
149  return squidaio_medium_bufs;
150  else
151  return squidaio_large_bufs;
152  }
153 
154  return NULL;
155 }
156 
157 void *
159 {
160  void *p;
161  MemAllocator *pool;
162 
163  if ((pool = squidaio_get_pool(size)) != NULL) {
164  p = pool->alloc();
165  } else
166  p = xmalloc(size);
167 
168  return p;
169 }
170 
171 static char *
172 squidaio_xstrdup(const char *str)
173 {
174  char *p;
175  int len = strlen(str) + 1;
176 
177  p = (char *)squidaio_xmalloc(len);
178  strncpy(p, str, len);
179 
180  return p;
181 }
182 
183 void
184 squidaio_xfree(void *p, int size)
185 {
186  MemAllocator *pool;
187 
188  if ((pool = squidaio_get_pool(size)) != NULL) {
189  pool->freeOne(p);
190  } else
191  xfree(p);
192 }
193 
194 static void
196 {
197  MemAllocator *pool;
198  int len = strlen(str) + 1;
199 
200  if ((pool = squidaio_get_pool(len)) != NULL) {
201  pool->freeOne(str);
202  } else
203  xfree(str);
204 }
205 
206 void
208 {
209  int i;
210  squidaio_thread_t *threadp;
211 
212  if (squidaio_initialised)
213  return;
214 
215  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
216  GetCurrentThread(), /* pseudo handle to copy */
217  GetCurrentProcess(), /* pseudo handle, don't close */
218  &main_thread,
219  0, /* required access */
220  FALSE, /* child process's don't inherit the handle */
221  DUPLICATE_SAME_ACCESS)) {
222  /* spit errors */
223  fatal("Couldn't get current thread handle");
224  }
225 
226  /* Initialize request queue */
227  if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
228  FALSE, /* start unowned (as per mutex_init) */
229  NULL) /* no name */
230  ) == NULL) {
231  fatal("Failed to create mutex");
232  }
233 
234  if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
235  FALSE, /* auto signal reset - which I think is pthreads like ? */
236  FALSE, /* start non signaled */
237  NULL) /* no name */
238  ) == NULL) {
239  fatal("Failed to create condition variable");
240  }
241 
242  request_queue.head = NULL;
243 
244  request_queue.tailp = &request_queue.head;
245 
246  request_queue.requests = 0;
247 
248  request_queue.blocked = 0;
249 
250  /* Initialize done queue */
251 
252  if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
253  FALSE, /* start unowned (as per mutex_init) */
254  NULL) /* no name */
255  ) == NULL) {
256  fatal("Failed to create mutex");
257  }
258 
259  if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
260  TRUE, /* manually signaled - which I think is pthreads like ? */
261  FALSE, /* start non signaled */
262  NULL) /* no name */
263  ) == NULL) {
264  fatal("Failed to create condition variable");
265  }
266 
267  done_queue.head = NULL;
268 
269  done_queue.tailp = &done_queue.head;
270 
271  done_queue.requests = 0;
272 
273  done_queue.blocked = 0;
274 
275  // Initialize the thread I/O pipes before creating any threads
276  // see bug 3189 comment 5 about race conditions.
278 
279  /* Create threads and get them to sit in their wait loop */
280  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
281 
283 
284  for (i = 0; i < NUMTHREADS; ++i) {
285  threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
286  threadp->status = _THREAD_STARTING;
287  threadp->current_req = NULL;
288  threadp->requests = 0;
289  threadp->next = threads;
290  threads = threadp;
291 
292  if ((threadp->thread = CreateThread(NULL, /* no security attributes */
293  0, /* use default stack size */
294  squidaio_thread_loop, /* thread function */
295  threadp, /* argument to thread function */
296  0, /* use default creation flags */
297  &(threadp->dwThreadId)) /* returns the thread identifier */
298  ) == NULL) {
299  fprintf(stderr, "Thread creation failed\n");
300  threadp->status = _THREAD_FAILED;
301  continue;
302  }
303 
304  /* Set the new thread priority above parent process */
305  SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
306  }
307 
308  /* Create request pool */
309  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
310 
311  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
312 
313  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
314 
315  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
316 
317  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
318 
319  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
320 
321  squidaio_initialised = 1;
322 }
323 
324 void
326 {
327  squidaio_thread_t *threadp;
328  int i;
329  HANDLE * hthreads;
330 
331  if (!squidaio_initialised)
332  return;
333 
334  /* This is the same as in squidaio_sync */
335  do {
337  } while (request_queue_len > 0);
338 
339  hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
340 
341  threadp = threads;
342 
343  for (i = 0; i < NUMTHREADS; ++i) {
344  threadp->exit = 1;
345  hthreads[i] = threadp->thread;
346  threadp = threadp->next;
347  }
348 
349  ReleaseMutex(request_queue.mutex);
350  ResetEvent(request_queue.cond);
351  ReleaseMutex(done_queue.mutex);
352  ResetEvent(done_queue.cond);
353  Sleep(0);
354 
355  WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
356 
357  for (i = 0; i < NUMTHREADS; ++i) {
358  CloseHandle(hthreads[i]);
359  }
360 
361  CloseHandle(main_thread);
363 
364  squidaio_initialised = 0;
365  xfree(hthreads);
366 }
367 
368 static DWORD WINAPI
370 {
371  squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
373  HANDLE cond; /* local copy of the event queue because win32 event handles
374  * don't atomically release the mutex as cond variables do. */
375 
376  /* lock the thread info */
377 
378  if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
379  fatal("Can't get ownership of mutex\n");
380  }
381 
382  /* duplicate the handle */
383  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
384  request_queue.cond, /* handle to copy */
385  GetCurrentProcess(), /* pseudo handle, don't close */
386  &cond,
387  0, /* required access */
388  FALSE, /* child process's don't inherit the handle */
389  DUPLICATE_SAME_ACCESS))
390  fatal("Can't duplicate mutex handle\n");
391 
392  if (!ReleaseMutex(request_queue.mutex)) {
393  CloseHandle(cond);
394  fatal("Can't release mutex\n");
395  }
396 
397  Sleep(0);
398 
399  while (1) {
400  DWORD rv;
401  threadp->current_req = request = NULL;
402  request = NULL;
403  /* Get a request to process */
404  threadp->status = _THREAD_WAITING;
405 
406  if (threadp->exit) {
407  CloseHandle(request_queue.mutex);
408  CloseHandle(cond);
409  return 0;
410  }
411 
412  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
413 
414  if (rv == WAIT_FAILED) {
415  CloseHandle(cond);
416  return 1;
417  }
418 
419  while (!request_queue.head) {
420  if (!ReleaseMutex(request_queue.mutex)) {
421  CloseHandle(cond);
422  threadp->status = _THREAD_FAILED;
423  return 1;
424  }
425 
426  Sleep(0);
427  rv = WaitForSingleObject(cond, INFINITE);
428 
429  if (rv == WAIT_FAILED) {
430  CloseHandle(cond);
431  return 1;
432  }
433 
434  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
435 
436  if (rv == WAIT_FAILED) {
437  CloseHandle(cond);
438  return 1;
439  }
440  }
441 
442  request = request_queue.head;
443 
444  if (request)
445  request_queue.head = request->next;
446 
447  if (!request_queue.head)
448  request_queue.tailp = &request_queue.head;
449 
450  if (!ReleaseMutex(request_queue.mutex)) {
451  CloseHandle(cond);
452  return 1;
453  }
454 
455  Sleep(0);
456 
457  /* process the request */
458  threadp->status = _THREAD_BUSY;
459 
460  request->next = NULL;
461 
462  threadp->current_req = request;
463 
464  errno = 0;
465 
466  if (!request->cancelled) {
467  switch (request->request_type) {
468 
469  case _AIO_OP_OPEN:
470  squidaio_do_open(request);
471  break;
472 
473  case _AIO_OP_READ:
474  squidaio_do_read(request);
475  break;
476 
477  case _AIO_OP_WRITE:
478  squidaio_do_write(request);
479  break;
480 
481  case _AIO_OP_CLOSE:
482  squidaio_do_close(request);
483  break;
484 
485  case _AIO_OP_UNLINK:
486  squidaio_do_unlink(request);
487  break;
488 
489 #if AIO_OPENDIR /* Opendir not implemented yet */
490 
491  case _AIO_OP_OPENDIR:
492  squidaio_do_opendir(request);
493  break;
494 #endif
495 
496  case _AIO_OP_STAT:
497  squidaio_do_stat(request);
498  break;
499 
500  default:
501  request->ret = -1;
502  request->err = EINVAL;
503  break;
504  }
505  } else { /* cancelled */
506  request->ret = -1;
507  request->err = EINTR;
508  }
509 
510  threadp->status = _THREAD_DONE;
511  /* put the request in the done queue */
512  rv = WaitForSingleObject(done_queue.mutex, INFINITE);
513 
514  if (rv == WAIT_FAILED) {
515  CloseHandle(cond);
516  return 1;
517  }
518 
519  *done_queue.tailp = request;
520  done_queue.tailp = &request->next;
521 
522  if (!ReleaseMutex(done_queue.mutex)) {
523  CloseHandle(cond);
524  return 1;
525  }
526 
528  Sleep(0);
529  ++ threadp->requests;
530  } /* while forever */
531 
532  CloseHandle(cond);
533 
534  return 0;
535 } /* squidaio_thread_loop */
536 
537 static void
539 {
540  static int high_start = 0;
541  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
542  /* Mark it as not executed (failing result, no error) */
543  request->ret = -1;
544  request->err = 0;
545  /* Internal housekeeping */
546  request_queue_len += 1;
547  request->resultp->_data = request;
548  /* Play some tricks with the request_queue2 queue */
549  request->next = NULL;
550 
551  if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
552  if (request_queue2.head) {
553  /* Grab blocked requests */
554  *request_queue.tailp = request_queue2.head;
555  request_queue.tailp = request_queue2.tailp;
556  }
557 
558  /* Enqueue request */
559  *request_queue.tailp = request;
560 
561  request_queue.tailp = &request->next;
562 
563  if (!SetEvent(request_queue.cond))
564  fatal("Couldn't push queue");
565 
566  if (!ReleaseMutex(request_queue.mutex)) {
567  /* unexpected error */
568  fatal("Couldn't push queue");
569  }
570 
571  Sleep(0);
572 
573  if (request_queue2.head) {
574  /* Clear queue of blocked requests */
575  request_queue2.head = NULL;
576  request_queue2.tailp = &request_queue2.head;
577  }
578  } else {
579  /* Oops, the request queue is blocked, use request_queue2 */
580  *request_queue2.tailp = request;
581  request_queue2.tailp = &request->next;
582  }
583 
584  if (request_queue2.head) {
585  static uint64_t filter = 0;
586  static uint64_t filter_limit = 8196;
587 
588  if (++filter >= filter_limit) {
589  filter_limit += filter;
590  filter = 0;
591  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion (growing to " << filter_limit << ")");
592  }
593  }
594 
595  /* Warn if out of threads */
596  if (request_queue_len > MAGIC1) {
597  static int last_warn = 0;
598  static int queue_high, queue_low;
599 
600  if (high_start == 0) {
601  high_start = (int)squid_curtime;
602  queue_high = request_queue_len;
603  queue_low = request_queue_len;
604  }
605 
606  if (request_queue_len > queue_high)
607  queue_high = request_queue_len;
608 
609  if (request_queue_len < queue_low)
610  queue_low = request_queue_len;
611 
612  if (squid_curtime >= (last_warn + 15) &&
613  squid_curtime >= (high_start + 5)) {
614  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
615 
616  if (squid_curtime >= (high_start + 15))
617  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
618  request_queue_len << ", high=" << queue_high <<
619  ", low=" << queue_low << ", duration=" <<
620  (long int) (squid_curtime - high_start));
621 
622  last_warn = (int)squid_curtime;
623  }
624  } else {
625  high_start = 0;
626  }
627 
628  /* Warn if seriously overloaded */
629  if (request_queue_len > RIDICULOUS_LENGTH) {
630  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
631  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
632  squidaio_sync();
633  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
634  }
635 } /* squidaio_queue_request */
636 
637 static void
639 {
640  squidaio_result_t *resultp = requestp->resultp;
641  int cancelled = requestp->cancelled;
642 
643  /* Free allocated structures and copy data back to user space if the */
644  /* request hasn't been cancelled */
645 
646  switch (requestp->request_type) {
647 
648  case _AIO_OP_STAT:
649 
650  if (!cancelled && requestp->ret == 0)
651  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
652 
653  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
654 
655  squidaio_xstrfree(requestp->path);
656 
657  break;
658 
659  case _AIO_OP_OPEN:
660  if (cancelled && requestp->ret >= 0)
661  /* The open() was cancelled but completed */
662  close(requestp->ret);
663 
664  squidaio_xstrfree(requestp->path);
665 
666  break;
667 
668  case _AIO_OP_CLOSE:
669  if (cancelled && requestp->ret < 0)
670  /* The close() was cancelled and never got executed */
671  close(requestp->fd);
672 
673  break;
674 
675  case _AIO_OP_UNLINK:
676 
677  case _AIO_OP_OPENDIR:
678  squidaio_xstrfree(requestp->path);
679 
680  break;
681 
682  case _AIO_OP_READ:
683  break;
684 
685  case _AIO_OP_WRITE:
686  break;
687 
688  default:
689  break;
690  }
691 
692  if (resultp != NULL && !cancelled) {
693  resultp->aio_return = requestp->ret;
694  resultp->aio_errno = requestp->err;
695  }
696 
697  squidaio_request_pool->freeOne(requestp);
698 } /* squidaio_cleanup_request */
699 
700 int
702 {
704 
705  if (request && request->resultp == resultp) {
706  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
707  request->cancelled = 1;
708  request->resultp = NULL;
709  resultp->_data = NULL;
710  resultp->result_type = _AIO_OP_NONE;
711  return 0;
712  }
713 
714  return 1;
715 } /* squidaio_cancel */
716 
717 int
718 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
719 {
720  squidaio_init();
721  squidaio_request_t *requestp;
722 
723  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
724 
725  requestp->path = (char *) squidaio_xstrdup(path);
726 
727  requestp->oflag = oflag;
728 
729  requestp->mode = mode;
730 
731  requestp->resultp = resultp;
732 
733  requestp->request_type = _AIO_OP_OPEN;
734 
735  requestp->cancelled = 0;
736 
737  resultp->result_type = _AIO_OP_OPEN;
738 
739  squidaio_queue_request(requestp);
740 
741  return 0;
742 }
743 
744 static void
746 {
747  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
748  requestp->err = errno;
749 }
750 
751 int
752 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
753 {
754  squidaio_request_t *requestp;
755 
756  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
757 
758  requestp->fd = fd;
759 
760  requestp->bufferp = bufp;
761 
762  requestp->buflen = bufs;
763 
764  requestp->offset = offset;
765 
766  requestp->whence = whence;
767 
768  requestp->resultp = resultp;
769 
770  requestp->request_type = _AIO_OP_READ;
771 
772  requestp->cancelled = 0;
773 
774  resultp->result_type = _AIO_OP_READ;
775 
776  squidaio_queue_request(requestp);
777 
778  return 0;
779 }
780 
781 static void
783 {
784  lseek(requestp->fd, requestp->offset, requestp->whence);
785 
786  if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
787  requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
788  WIN32_maperror(GetLastError());
789  requestp->ret = -1;
790  }
791 
792  requestp->err = errno;
793 }
794 
795 int
796 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
797 {
798  squidaio_request_t *requestp;
799 
800  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
801 
802  requestp->fd = fd;
803 
804  requestp->bufferp = bufp;
805 
806  requestp->buflen = bufs;
807 
808  requestp->offset = offset;
809 
810  requestp->whence = whence;
811 
812  requestp->resultp = resultp;
813 
814  requestp->request_type = _AIO_OP_WRITE;
815 
816  requestp->cancelled = 0;
817 
818  resultp->result_type = _AIO_OP_WRITE;
819 
820  squidaio_queue_request(requestp);
821 
822  return 0;
823 }
824 
825 static void
827 {
828  if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
829  requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
830  WIN32_maperror(GetLastError());
831  requestp->ret = -1;
832  }
833 
834  requestp->err = errno;
835 }
836 
837 int
839 {
840  squidaio_request_t *requestp;
841 
842  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
843 
844  requestp->fd = fd;
845 
846  requestp->resultp = resultp;
847 
848  requestp->request_type = _AIO_OP_CLOSE;
849 
850  requestp->cancelled = 0;
851 
852  resultp->result_type = _AIO_OP_CLOSE;
853 
854  squidaio_queue_request(requestp);
855 
856  return 0;
857 }
858 
859 static void
861 {
862  if ((requestp->ret = close(requestp->fd)) < 0) {
863  debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
864  close(requestp->fd);
865  }
866 
867  requestp->err = errno;
868 }
869 
870 int
871 
872 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
873 {
874  squidaio_init();
875  squidaio_request_t *requestp;
876 
877  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
878 
879  requestp->path = (char *) squidaio_xstrdup(path);
880 
881  requestp->statp = sb;
882 
883  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
884 
885  requestp->resultp = resultp;
886 
887  requestp->request_type = _AIO_OP_STAT;
888 
889  requestp->cancelled = 0;
890 
891  resultp->result_type = _AIO_OP_STAT;
892 
893  squidaio_queue_request(requestp);
894 
895  return 0;
896 }
897 
898 static void
900 {
901  requestp->ret = stat(requestp->path, requestp->tmpstatp);
902  requestp->err = errno;
903 }
904 
905 int
906 squidaio_unlink(const char *path, squidaio_result_t * resultp)
907 {
908  squidaio_init();
909  squidaio_request_t *requestp;
910 
911  requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
912 
913  requestp->path = squidaio_xstrdup(path);
914 
915  requestp->resultp = resultp;
916 
917  requestp->request_type = _AIO_OP_UNLINK;
918 
919  requestp->cancelled = 0;
920 
921  resultp->result_type = _AIO_OP_UNLINK;
922 
923  squidaio_queue_request(requestp);
924 
925  return 0;
926 }
927 
928 static void
930 {
931  requestp->ret = unlink(requestp->path);
932  requestp->err = errno;
933 }
934 
935 #if AIO_OPENDIR
936 /* XXX squidaio_opendir NOT implemented yet.. */
937 
938 int
939 squidaio_opendir(const char *path, squidaio_result_t * resultp)
940 {
941  squidaio_request_t *requestp;
942  int len;
943 
944  requestp = squidaio_request_pool->alloc();
945 
946  resultp->result_type = _AIO_OP_OPENDIR;
947 
948  return -1;
949 }
950 
951 static void
952 squidaio_do_opendir(squidaio_request_t * requestp)
953 {
954  /* NOT IMPLEMENTED */
955 }
956 
957 #endif
958 
959 static void
961 {
962  /* kick "overflow" request queue */
963 
964  if (request_queue2.head &&
965  (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
966  *request_queue.tailp = request_queue2.head;
967  request_queue.tailp = request_queue2.tailp;
968 
969  if (!SetEvent(request_queue.cond))
970  fatal("couldn't push queue\n");
971 
972  if (!ReleaseMutex(request_queue.mutex)) {
973  /* unexpected error */
974  }
975 
976  Sleep(0);
977  request_queue2.head = NULL;
978  request_queue2.tailp = &request_queue2.head;
979  }
980 
981  /* poll done queue */
982  if (done_queue.head &&
983  (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
984 
985  struct squidaio_request_t *requests = done_queue.head;
986  done_queue.head = NULL;
987  done_queue.tailp = &done_queue.head;
988 
989  if (!ReleaseMutex(done_queue.mutex)) {
990  /* unexpected error */
991  }
992 
993  Sleep(0);
994  *done_requests.tailp = requests;
995  request_queue_len -= 1;
996 
997  while (requests->next) {
998  requests = requests->next;
999  request_queue_len -= 1;
1000  }
1001 
1002  done_requests.tailp = &requests->next;
1003  }
1004 }
1005 
1008 {
1011  int cancelled;
1012  int polled = 0;
1013 
1014 AIO_REPOLL:
1015  request = done_requests.head;
1016 
1017  if (request == NULL && !polled) {
1020  polled = 1;
1021  request = done_requests.head;
1022  }
1023 
1024  if (!request) {
1025  return NULL;
1026  }
1027 
1028  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1029  done_requests.head = request->next;
1030 
1031  if (!done_requests.head)
1032  done_requests.tailp = &done_requests.head;
1033 
1034  resultp = request->resultp;
1035 
1036  cancelled = request->cancelled;
1037 
1038  squidaio_debug(request);
1039 
1040  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1041 
1042  squidaio_cleanup_request(request);
1043 
1044  if (cancelled)
1045  goto AIO_REPOLL;
1046 
1047  return resultp;
1048 } /* squidaio_poll_done */
1049 
1050 int
1052 {
1053  return request_queue_len + (done_requests.head ? 1 : 0);
1054 }
1055 
1056 int
1058 {
1059  /* XXX This might take a while if the queue is large.. */
1060 
1061  do {
1063  } while (request_queue_len > 0);
1064 
1065  return squidaio_operations_pending();
1066 }
1067 
1068 int
1070 {
1071  return request_queue_len;
1072 }
1073 
1074 static void
1076 {
1077  switch (request->request_type) {
1078 
1079  case _AIO_OP_OPEN:
1080  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1081  break;
1082 
1083  case _AIO_OP_READ:
1084  debugs(43, 5, "READ on fd: " << request->fd);
1085  break;
1086 
1087  case _AIO_OP_WRITE:
1088  debugs(43, 5, "WRITE on fd: " << request->fd);
1089  break;
1090 
1091  case _AIO_OP_CLOSE:
1092  debugs(43, 5, "CLOSE of fd: " << request->fd);
1093  break;
1094 
1095  case _AIO_OP_UNLINK:
1096  debugs(43, 5, "UNLINK of " << request->path);
1097  break;
1098 
1099  default:
1100  break;
1101  }
1102 }
1103 
1104 void
1106 {
1107  squidaio_thread_t *threadp;
1108  int i;
1109 
1110  if (!squidaio_initialised)
1111  return;
1112 
1113  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1114 
1115  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1116 
1117  threadp = threads;
1118 
1119  for (i = 0; i < NUMTHREADS; ++i) {
1120  storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1121  threadp = threadp->next;
1122  }
1123 }
1124 
static void ResetNotifications()
Definition: CommIO.cc:69
static MemAllocator * squidaio_large_bufs
Definition: aiops_win32.cc:106
#define assert(EX)
Definition: assert.h:17
squidaio_request_type request_type
Definition: aiops.cc:52
#define AIO_MICRO_BUFS
Definition: aiops_win32.cc:104
unsigned long requests
Definition: aiops.cc:88
squidaio_request_t * head
Definition: aiops_win32.cc:118
virtual void freeOne(void *)=0
#define xcalloc
Definition: membanger.c:57
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
int i
Definition: membanger.c:49
int squidaio_get_queue_len(void)
size_t buflen
Definition: aiops.cc:59
static struct stat sb
Definition: squidclient.cc:75
static void NotifyIOClose()
Definition: CommIO.cc:38
static struct @49 done_requests
struct squidaio_request_t * current_req
Definition: aiops.cc:87
int squidaio_opendir(const char *, squidaio_result_t *)
#define DBG_CRITICAL
Definition: Debug.h:44
char * p
Definition: membanger.c:43
static MemAllocator * squidaio_medium_bufs
Definition: aiops_win32.cc:107
char * bufferp
Definition: aiops.cc:58
time_t squid_curtime
Definition: stub_time.cc:17
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:796
void squidaio_xfree(void *p, int size)
Definition: aiops_win32.cc:184
#define NUMTHREADS
Definition: DiskThreads.h:30
static MemAllocator * squidaio_tiny_bufs
Definition: aiops_win32.cc:109
void * squidaio_xmalloc(int size)
Definition: aiops_win32.cc:158
static void NotifyIOCompleted()
Definition: CommIO.h:36
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops_win32.cc:929
#define memPoolCreate
Definition: Pool.h:325
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
#define DBG_IMPORTANT
Definition: Debug.h:45
squidaio_result_t * squidaio_poll_done(void)
static struct @48 request_queue2
static MemAllocator * squidaio_micro_bufs
Definition: aiops_win32.cc:110
struct stat * tmpstatp
Definition: aiops.cc:65
squidaio_result_t * resultp
Definition: aiops.cc:68
#define RIDICULOUS_LENGTH
Definition: aiops_win32.cc:26
static void squidaio_poll_queues(void)
Definition: aiops_win32.cc:960
static squidaio_thread_t * threads
Definition: aiops_win32.cc:97
static squidaio_request_queue_t request_queue
Definition: aiops_win32.cc:115
struct squidaio_request_t * next
Definition: aiops.cc:51
squidaio_thread_status status
Definition: aiops.cc:85
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops_win32.cc:718
int squidaio_operations_pending(void)
void fatal(const char *message)
Definition: fatal.cc:39
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops_win32.cc:35
void squidaio_stats(StoreEntry *sentry)
typedef LPVOID
Definition: WinSvc.cc:73
#define TRUE
Definition: std-includes.h:55
static MemAllocator * squidaio_get_pool(int size)
Definition: aiops_win32.cc:139
static HANDLE main_thread
Definition: aiops_win32.cc:136
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:75
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops_win32.cc:538
#define AIO_SMALL_BUFS
Definition: aiops_win32.cc:102
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops_win32.cc:860
int volatile exit
Definition: aiops_win32.cc:79
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops_win32.cc:826
#define AIO_TINY_BUFS
Definition: aiops_win32.cc:103
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
static void Initialize()
Definition: CommIO.cc:19
unsigned short mode_t
Definition: types.h:150
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops_win32.cc:638
bool SIGHDLR int STUB void int
Definition: stub_tools.cc:68
typedef DWORD
Definition: WinSvc.cc:73
static MemAllocator * squidaio_small_bufs
Definition: aiops_win32.cc:108
unsigned long blocked
Definition: aiops.cc:77
pthread_mutex_t mutex
Definition: aiops.cc:72
squidaio_thread_t * next
Definition: aiops.cc:83
void squidaio_shutdown(void)
Definition: aiops_win32.cc:325
#define FALSE
Definition: std-includes.h:56
static int squidaio_initialised
Definition: aiops_win32.cc:98
#define xmalloc
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops_win32.cc:701
_squidaio_thread_status
Definition: aiops.cc:40
pthread_t thread
Definition: aiops.cc:84
#define AIO_MEDIUM_BUFS
Definition: aiops_win32.cc:101
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops_win32.cc:838
static void squidaio_debug(squidaio_request_t *)
#define AIO_LARGE_BUFS
Definition: aiops_win32.cc:100
static char * squidaio_xstrdup(const char *str)
Definition: aiops_win32.cc:172
struct squidaio_request_queue_t squidaio_request_queue_t
#define MAGIC1
Definition: DiskThreads.h:34
unsigned long requests
Definition: aiops.cc:76
struct stat * statp
Definition: aiops.cc:67
void squidaio_init(void)
Definition: aiops_win32.cc:207
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops_win32.cc:899
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops_win32.cc:906
squidaio_request_t ** tailp
Definition: aiops_win32.cc:118
pthread_cond_t cond
Definition: aiops.cc:73
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops_win32.cc:745
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops_win32.cc:872
static void squidaio_xstrfree(char *str)
Definition: aiops_win32.cc:195
struct squidaio_request_t squidaio_request_t
static int request_queue_len
Definition: aiops_win32.cc:112
int squidaio_sync(void)
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
#define xfree
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:904
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops_win32.cc:782
squidaio_request_t *volatile head
Definition: aiops.cc:74
static squidaio_request_queue_t done_queue
Definition: aiops_win32.cc:125
#define NULL
Definition: types.h:166
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:752
int size
Definition: ModDevPoll.cc:77
virtual void * alloc()=0
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
Definition: aiops_win32.cc:369

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors