DiskdIOStrategy.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Squid-side DISKD I/O functions. */
10 
11 #include "squid.h"
12 #include "comm/Loops.h"
13 #include "ConfigOption.h"
14 #include "diomsg.h"
15 #include "DiskdFile.h"
16 #include "DiskdIOStrategy.h"
17 #include "DiskIO/DiskFile.h"
18 #include "fd.h"
19 #include "SquidConfig.h"
20 #include "SquidIpc.h"
21 #include "SquidTime.h"
22 #include "StatCounters.h"
23 #include "Store.h"
24 #include "unlinkd.h"
25 
26 #include <cerrno>
27 #if HAVE_SYS_IPC_H
28 #include <sys/ipc.h>
29 #endif
30 #if HAVE_SYS_MSG_H
31 #include <sys/msg.h>
32 #endif
33 #if HAVE_SYS_SHM_H
34 #include <sys/shm.h>
35 #endif
36 
38 
40 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
41 
42 size_t
44 {
45  return ++nextInstanceID;
46 }
47 
48 bool
50 {
51  /*
52  * Fail on open() if there are too many requests queued.
53  */
54 
55  if (away > magic1) {
56  debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
57 
58  return true;
59  }
60 
61  return false;
62 }
63 
64 int
66 {
67  /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
68  /* the parse function guarantees magic2 is positivie */
69  return away * 1000 / magic2;
70 }
71 
72 void
74 {
75  ++diskd_stats.open_fail_queue_len;
76 }
77 
79 DiskdIOStrategy::newFile(char const *path)
80 {
81  if (shedLoad()) {
82  openFailed();
83  return NULL;
84  }
85 
86  return new DiskdFile (path, this);
87 }
88 
89 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
90 {}
91 
92 bool
94 {
95  return true;
96 }
97 
98 void
99 DiskdIOStrategy::unlinkFile(char const *path)
100 {
101  if (shedLoad()) {
102  /* Damn, we need to issue a sync unlink here :( */
103  debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
104  unlinkdUnlink(path);
105  return;
106  }
107 
108  /* We can attempt a diskd unlink */
109  int x;
110 
111  ssize_t shm_offset;
112 
113  char *buf;
114 
115  buf = (char *)shm.get(&shm_offset);
116 
117  xstrncpy(buf, path, SHMBUF_BLKSZ);
118 
119  x = send(_MQD_UNLINK,
120  0,
122  0,
123  0,
124  shm_offset);
125 
126  if (x < 0) {
127  int xerrno = errno;
128  debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
129  ::unlink(buf); /* XXX EWW! */
130  // shm.put (shm_offset);
131  }
132 
133  ++diskd_stats.unlink.ops;
134 }
135 
136 void
138 {
139  int pid;
140  void * hIpc;
141  int rfd;
142  int ikey;
143  const char *args[5];
144  char skey1[32];
145  char skey2[32];
146  char skey3[32];
147  Ip::Address localhost;
148 
149  ikey = (getpid() << 10) + (instanceID << 2);
150  ikey &= 0x7fffffff;
151  smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
152 
153  if (smsgid < 0) {
154  int xerrno = errno;
155  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
156  fatal("msgget failed");
157  }
158 
159  rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
160 
161  if (rmsgid < 0) {
162  int xerrno = errno;
163  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
164  fatal("msgget failed");
165  }
166 
167  shm.init(ikey, magic2);
168  snprintf(skey1, 32, "%d", ikey);
169  snprintf(skey2, 32, "%d", ikey + 1);
170  snprintf(skey3, 32, "%d", ikey + 2);
171  args[0] = "diskd";
172  args[1] = skey1;
173  args[2] = skey2;
174  args[3] = skey3;
175  args[4] = NULL;
176  localhost.setLocalhost();
177  pid = ipcCreate(IPC_STREAM,
179  args,
180  "diskd",
181  localhost,
182  &rfd,
183  &wfd,
184  &hIpc);
185 
186  if (pid < 0)
187  fatalf("execl: %s", Config.Program.diskd);
188 
189  if (rfd != wfd)
190  comm_close(rfd);
191 
192  fd_note(wfd, "squid -> diskd");
193 
197 }
198 
199 /*
200  * SHM manipulation routines
201  */
202 void
203 SharedMemory::put(ssize_t offset)
204 {
205  int i;
206  assert(offset >= 0);
207  assert(offset < nbufs * SHMBUF_BLKSZ);
208  i = offset / SHMBUF_BLKSZ;
209  assert(i < nbufs);
211  CBIT_CLR(inuse_map, i);
212  --diskd_stats.shmbuf_count;
213 }
214 
215 void *
216 
217 SharedMemory::get(ssize_t * shm_offset)
218 {
219  char *aBuf = NULL;
220  int i;
221 
222  for (i = 0; i < nbufs; ++i) {
223  if (CBIT_TEST(inuse_map, i))
224  continue;
225 
226  CBIT_SET(inuse_map, i);
227 
228  *shm_offset = i * SHMBUF_BLKSZ;
229 
230  aBuf = buf + (*shm_offset);
231 
232  break;
233  }
234 
235  assert(aBuf);
236  assert(aBuf >= buf);
237  assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
238  ++diskd_stats.shmbuf_count;
239 
240  if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
241  diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
242 
243  return aBuf;
244 }
245 
246 void
247 SharedMemory::init(int ikey, int magic2)
248 {
249  nbufs = (int)(magic2 * 1.3);
250  id = shmget((key_t) (ikey + 2),
251  nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
252 
253  if (id < 0) {
254  int xerrno = errno;
255  debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
256  fatal("shmget failed");
257  }
258 
259  buf = (char *)shmat(id, NULL, 0);
260 
261  if (buf == (void *) -1) {
262  int xerrno = errno;
263  debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
264  fatal("shmat failed");
265  }
266 
267  inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
268  diskd_stats.shmbuf_count += nbufs;
269 
270  for (int i = 0; i < nbufs; ++i) {
271  CBIT_SET(inuse_map, i);
272  put (i * SHMBUF_BLKSZ);
273  }
274 }
275 
276 void
278 {
279  debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
280  ++statCounter.syscalls.disk.unlinks;
281 
282  if (M->status < 0)
283  ++diskd_stats.unlink.fail;
284  else
285  ++diskd_stats.unlink.success;
286 }
287 
288 void
290 {
292  /* I.e. already closed file
293  * - say when we have a error opening after
294  * a read was already queued
295  */
296  debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
298  return;
299  }
300 
301  /* set errno passed from diskd. makes debugging more meaningful */
302  if (M->status < 0)
303  errno = -M->status;
304 
305  if (M->newstyle) {
306  DiskdFile *theFile = (DiskdFile *)M->callback_data;
307  theFile->unlock();
308  theFile->completed (M);
309  } else
310  switch (M->mtype) {
311 
312  case _MQD_OPEN:
313 
314  case _MQD_CREATE:
315 
316  case _MQD_CLOSE:
317 
318  case _MQD_READ:
319 
320  case _MQD_WRITE:
321  assert (0);
322  break;
323 
324  case _MQD_UNLINK:
325  unlinkDone(M);
326  break;
327 
328  default:
329  assert(0);
330  break;
331  }
332 
334 }
335 
336 int
337 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
338 {
339  diomsg M;
340  M.callback_data = cbdataReference(theFile);
341  theFile->lock();
342  M.requestor = requestor;
343  M.newstyle = true;
344 
345  if (requestor)
346  requestor->lock();
347 
348  return SEND(&M, mtype, id, size, offset, shm_offset);
349 }
350 
351 int
352 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
353 {
354  diomsg M;
356  M.newstyle = false;
357 
358  return SEND(&M, mtype, id, size, offset, shm_offset);
359 }
360 
361 int
362 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
363 {
364  static int send_errors = 0;
365  static int last_seq_no = 0;
366  static int seq_no = 0;
367  int x;
368 
369  M->mtype = mtype;
370  M->size = size;
371  M->offset = offset;
372  M->status = -1;
373  M->shm_offset = (int) shm_offset;
374  M->id = id;
375  M->seq_no = ++seq_no;
376 
377  if (M->seq_no < last_seq_no)
378  debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
379 
380  x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
381 
382  last_seq_no = M->seq_no;
383 
384  if (0 == x) {
385  ++diskd_stats.sent_count;
386  ++away;
387  } else {
388  int xerrno = errno;
389  debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
391  ++send_errors;
392  assert(send_errors < 100);
393  if (shm_offset > -1)
394  shm.put(shm_offset);
395  }
396 
397  /*
398  * We have to drain the queue here if necessary. If we don't,
399  * then we can have a lot of messages in the queue (probably
400  * up to 2*magic1) and we can run out of shared memory buffers.
401  */
402  /*
403  * Note that we call Store::Root().callbackk (for all SDs), rather
404  * than callback for just this SD, so that while
405  * we're "blocking" on this SD we can also handle callbacks
406  * from other SDs that might be ready.
407  */
408 
409  struct timeval delay = {0, 1};
410 
411  while (away > magic2) {
412  select(0, NULL, NULL, NULL, &delay);
413  Store::Root().callback();
414 
415  if (delay.tv_usec < 1000000)
416  delay.tv_usec <<= 1;
417  }
418 
419  return x;
420 }
421 
422 ConfigOption *
424 {
426  result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
427  result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
428  return result;
429 }
430 
431 bool
432 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
433 {
434  if (strcmp(name, "Q1") != 0)
435  return false;
436 
437  int old_magic1 = magic1;
438 
439  magic1 = atoi(value);
440 
441  if (!isaReconfig)
442  return true;
443 
444  if (old_magic1 < magic1) {
445  /*
446  * This is because shm.nbufs is computed at startup, when
447  * we call shmget(). We can't increase the Q1/Q2 parameters
448  * beyond their initial values because then we might have
449  * more "Q2 messages" than shared memory chunks, and this
450  * will cause an assertion in storeDiskdShmGet().
451  */
452  /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
453  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
454  magic1 = old_magic1;
455  return true;
456  }
457 
458  if (old_magic1 != magic1)
459  debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
460 
461  return true;
462 }
463 
464 void
466 {
467  storeAppendPrintf(e, " Q1=%d", magic1);
468 }
469 
470 bool
471 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
472 {
473  if (strcmp(name, "Q2") != 0)
474  return false;
475 
476  int old_magic2 = magic2;
477 
478  magic2 = atoi(value);
479 
480  if (!isaReconfig)
481  return true;
482 
483  if (old_magic2 < magic2) {
484  /* See comments in Q1 function above */
485  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
486  magic2 = old_magic2;
487  return true;
488  }
489 
490  if (old_magic2 != magic2)
491  debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
492 
493  return true;
494 }
495 
496 void
498 {
499  storeAppendPrintf(e, " Q2=%d", magic2);
500 }
501 
502 /*
503  * Sync any pending data. We just sit around and read the queue
504  * until the data has finished writing.
505  */
506 void
508 {
509  static time_t lastmsg = 0;
510 
511  while (away > 0) {
512  if (squid_curtime > lastmsg) {
513  debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
514  lastmsg = squid_curtime;
515  }
516 
517  callback();
518  }
519 }
520 
521 /*
522  * Handle callbacks. If we have more than magic2 requests away, we block
523  * until the queue is below magic2. Otherwise, we simply return when we
524  * don't get a message.
525  */
526 
527 int
529 {
530  diomsg M;
531  int x;
532  int retval = 0;
533 
534  if (away >= magic2) {
535  ++diskd_stats.block_queue_len;
536  retval = 1;
537  /* We might not have anything to do, but our queue
538  * is full.. */
539  }
540 
541  if (diskd_stats.sent_count - diskd_stats.recv_count >
542  diskd_stats.max_away) {
543  diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
544  }
545 
546  while (1) {
547 #ifdef ALWAYS_ZERO_BUFFERS
548  memset(&M, '\0', sizeof(M));
549 #endif
550 
551  x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
552 
553  if (x < 0)
554  break;
555  else if (x != diomsg::msg_snd_rcv_sz) {
556  debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
557  break;
558  }
559 
560  ++diskd_stats.recv_count;
561  --away;
562  handle(&M);
563  retval = 1; /* Return that we've actually done some work */
564 
565  if (M.shm_offset > -1)
566  shm.put ((off_t) M.shm_offset);
567  }
568 
569  return retval;
570 }
571 
572 void
574 {
575  storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
576 }
577 
mtyp_t mtype
Definition: diomsg.h:31
size_t size
Definition: diomsg.h:36
virtual bool shedLoad()
static const int msg_snd_rcv_sz
Definition: diomsg.h:41
void init(int ikey, int magic2)
StatCounters statCounter
Definition: StatCounters.cc:12
#define assert(EX)
Definition: assert.h:17
static size_t newInstance()
#define SHMBUF_BLKSZ
Definition: diomsg.h:30
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
#define CBIT_TEST(mask, bit)
Definition: defines.h:114
void fd_note(int fd, const char *s)
Definition: fd.cc:251
void * callback_data
Definition: diomsg.h:34
void * get(ssize_t *)
Lock * requestor
Definition: diomsg.h:35
#define xcalloc
Definition: membanger.c:57
void setLocalhost()
Definition: Address.cc:248
virtual void statfs(StoreEntry &sentry) const
int i
Definition: membanger.c:49
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:62
int id
Definition: diomsg.h:32
virtual int load()
int status
Definition: diomsg.h:38
struct diskd_stats_t::@45 unlink
#define DBG_CRITICAL
Definition: Debug.h:44
static void * hIpc
Definition: IcmpSquid.cc:34
Controller & Root()
safely access controller singleton
Definition: Controller.cc:619
virtual ConfigOption * getOptionTree() const
off_t offset
Definition: diomsg.h:37
time_t squid_curtime
Definition: stub_time.cc:17
virtual void sync()
void unlinkDone(diomsg *M)
#define CBIT_SET(mask, bit)
Definition: defines.h:112
int SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
std::vector< ConfigOption * > options
Definition: ConfigOption.h:35
void fatalf(const char *fmt,...)
Definition: fatal.cc:79
SharedMemory shm
const char * xstrerr(int error)
Definition: xstrerror.cc:83
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
#define cbdataReference(var)
Definition: cbdata.h:341
struct StatCounters::@136 syscalls
#define DBG_IMPORTANT
Definition: Debug.h:45
bool newstyle
Definition: diomsg.h:39
struct StatCounters::@136::@139 disk
int shm_offset
Definition: diomsg.h:40
#define IPC_STREAM
Definition: defines.h:161
void lock() const
Definition: Lock.h:34
char * xstrncpy(char *dst, const char *src, size_t n)
Definition: xstring.cc:37
void fatal(const char *message)
Definition: fatal.cc:39
virtual void init()
void put(ssize_t)
int commSetNonBlocking(int fd)
Definition: comm.cc:1076
char * diskd
Definition: SquidConfig.h:197
virtual void unlinkFile(char const *)
int seq_no
Definition: diomsg.h:33
virtual bool unlinkdUseful() const
virtual int callback() override
called once every main loop iteration; TODO: Move to UFS code.
Definition: Controller.cc:223
void const char * buf
Definition: stub_helper.cc:16
virtual int callback()
static pid_t pid
Definition: IcmpSquid.cc:35
void optionQ2Dump(StoreEntry *e) const
bool SIGHDLR int STUB void int
Definition: stub_tools.cc:68
struct SquidConfig::@104 Program
#define MYNAME
Definition: Debug.h:160
static size_t nextInstanceID
#define CBIT_CLR(mask, bit)
Definition: defines.h:113
void unlinkdUnlink(const char *path)
Definition: unlinkd.cc:37
void completed(diomsg *)
Definition: DiskdFile.cc:208
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:412
Definition: Lock.h:25
void commUnsetFdTimeout(int fd)
clear a timeout handler by FD number
Definition: comm.cc:539
bool optionQ2Parse(char const *option, const char *value, int reconfiguring)
void optionQ1Dump(StoreEntry *e) const
int send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
diskd_stats_t diskd_stats
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:904
bool optionQ1Parse(char const *option, const char *value, int reconfiguring)
C * getRaw() const
Definition: RefCount.h:74
long mtyp_t
Definition: types.h:162
void handle(diomsg *M)
class SquidConfig Config
Definition: SquidConfig.cc:12
#define comm_close(x)
Definition: comm.h:28
#define NULL
Definition: types.h:166
int size
Definition: ModDevPoll.cc:77
void QuickPollRequired(void)
Definition: ModDevPoll.cc:444
virtual RefCount< DiskFile > newFile(char const *path)

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors