DiskdIOStrategy.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 79 Squid-side DISKD I/O functions. */
10
11#include "squid.h"
12#include "comm.h"
13#include "comm/Loops.h"
14#include "ConfigOption.h"
15#include "diomsg.h"
16#include "DiskdFile.h"
17#include "DiskdIOStrategy.h"
18#include "DiskIO/DiskFile.h"
19#include "fd.h"
20#include "SquidConfig.h"
21#include "SquidIpc.h"
22#include "StatCounters.h"
23#include "Store.h"
24#include "unlinkd.h"
25
26#include <cerrno>
27#if HAVE_SYS_IPC_H
28#include <sys/ipc.h>
29#endif
30#if HAVE_SYS_MSG_H
31#include <sys/msg.h>
32#endif
33#if HAVE_SYS_SHM_H
34#include <sys/shm.h>
35#endif
36
38
40const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
41
42size_t
44{
45 return ++nextInstanceID;
46}
47
48bool
50{
51 /*
52 * Fail on open() if there are too many requests queued.
53 */
54
55 if (away > magic1) {
56 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
57
58 return true;
59 }
60
61 return false;
62}
63
64int
66{
67 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
68 /* the parse function guarantees magic2 is positivie */
69 return away * 1000 / magic2;
70}
71
72void
74{
76}
77
79DiskdIOStrategy::newFile(char const *path)
80{
81 if (shedLoad()) {
82 openFailed();
83 return nullptr;
84 }
85
86 return new DiskdFile (path, this);
87}
88
89DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
90{}
91
92bool
94{
95 return true;
96}
97
98void
100{
101 if (shedLoad()) {
102 /* Damn, we need to issue a sync unlink here :( */
103 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
104 unlinkdUnlink(path);
105 return;
106 }
107
108 /* We can attempt a diskd unlink */
109 int x;
110
111 ssize_t shm_offset;
112
113 char *buf;
114
115 buf = (char *)shm.get(&shm_offset);
116
117 xstrncpy(buf, path, SHMBUF_BLKSZ);
118
119 x = send(_MQD_UNLINK,
120 0,
121 (StoreIOState::Pointer )nullptr,
122 0,
123 0,
124 shm_offset);
125
126 if (x < 0) {
127 int xerrno = errno;
128 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
129 ::unlink(buf); /* XXX EWW! */
130 // shm.put (shm_offset);
131 }
132
134}
135
136void
138{
139 int pid;
140 void * hIpc;
141 int rfd;
142 int ikey;
143 const char *args[5];
144 char skey1[32];
145 char skey2[32];
146 char skey3[32];
147 Ip::Address localhost;
148
149 ikey = (getpid() << 10) + (instanceID << 2);
150 ikey &= 0x7fffffff;
151 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
152
153 if (smsgid < 0) {
154 int xerrno = errno;
155 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
156 fatal("msgget failed");
157 }
158
159 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
160
161 if (rmsgid < 0) {
162 int xerrno = errno;
163 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
164 fatal("msgget failed");
165 }
166
167 shm.init(ikey, magic2);
168 snprintf(skey1, 32, "%d", ikey);
169 snprintf(skey2, 32, "%d", ikey + 1);
170 snprintf(skey3, 32, "%d", ikey + 2);
171 args[0] = "diskd";
172 args[1] = skey1;
173 args[2] = skey2;
174 args[3] = skey3;
175 args[4] = nullptr;
176 localhost.setLocalhost();
179 args,
180 "diskd",
181 localhost,
182 &rfd,
183 &wfd,
184 &hIpc);
185
186 if (pid < 0)
187 fatalf("execl: %s", Config.Program.diskd);
188
189 if (rfd != wfd)
190 comm_close(rfd);
191
192 fd_note(wfd, "squid -> diskd");
193
197}
198
199/*
200 * SHM manipulation routines
201 */
202void
203SharedMemory::put(ssize_t offset)
204{
205 int i;
206 assert(offset >= 0);
207 assert(offset < nbufs * SHMBUF_BLKSZ);
208 i = offset / SHMBUF_BLKSZ;
209 assert(i < nbufs);
213}
214
215void *
216
217SharedMemory::get(ssize_t * shm_offset)
218{
219 char *aBuf = nullptr;
220 int i;
221
222 for (i = 0; i < nbufs; ++i) {
223 if (CBIT_TEST(inuse_map, i))
224 continue;
225
227
228 *shm_offset = i * SHMBUF_BLKSZ;
229
230 aBuf = buf + (*shm_offset);
231
232 break;
233 }
234
235 assert(aBuf);
236 assert(aBuf >= buf);
237 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
239
242
243 return aBuf;
244}
245
246void
247SharedMemory::init(int ikey, int magic2)
248{
249 nbufs = (int)(magic2 * 1.3);
250 id = shmget((key_t) (ikey + 2),
251 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
252
253 if (id < 0) {
254 int xerrno = errno;
255 debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
256 fatal("shmget failed");
257 }
258
259 buf = (char *)shmat(id, nullptr, 0);
260
261 if (buf == (void *) -1) {
262 int xerrno = errno;
263 debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
264 fatal("shmat failed");
265 }
266
267 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
269
270 for (int i = 0; i < nbufs; ++i) {
272 put (i * SHMBUF_BLKSZ);
273 }
274}
275
276void
278{
279 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
281
282 if (M->status < 0)
284 else
286}
287
288void
290{
292 /* I.e. already closed file
293 * - say when we have a error opening after
294 * a read was already queued
295 */
296 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
298 return;
299 }
300
301 /* set errno passed from diskd. makes debugging more meaningful */
302 if (M->status < 0)
303 errno = -M->status;
304
305 if (M->newstyle) {
306 DiskdFile *theFile = (DiskdFile *)M->callback_data;
307 theFile->unlock();
308 theFile->completed (M);
309 } else
310 switch (M->mtype) {
311
312 case _MQD_OPEN:
313
314 case _MQD_CREATE:
315
316 case _MQD_CLOSE:
317
318 case _MQD_READ:
319
320 case _MQD_WRITE:
321 assert (0);
322 break;
323
324 case _MQD_UNLINK:
325 unlinkDone(M);
326 break;
327
328 default:
329 assert(0);
330 break;
331 }
332
334}
335
336int
337DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
338{
339 diomsg M;
340 M.callback_data = cbdataReference(theFile);
341 theFile->lock();
342 M.requestor = requestor;
343 M.newstyle = true;
344
345 if (requestor)
346 requestor->lock();
347
348 return SEND(&M, mtype, id, size, offset, shm_offset);
349}
350
351int
352DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
353{
354 diomsg M;
356 M.newstyle = false;
357
358 return SEND(&M, mtype, id, size, offset, shm_offset);
359}
360
361int
362DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
363{
364 static int send_errors = 0;
365 static int last_seq_no = 0;
366 static int seq_no = 0;
367 int x;
368
369 M->mtype = mtype;
370 M->size = size;
371 M->offset = offset;
372 M->status = -1;
373 M->shm_offset = (int) shm_offset;
374 M->id = id;
375 M->seq_no = ++seq_no;
376
377 if (M->seq_no < last_seq_no)
378 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
379
380 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
381
382 last_seq_no = M->seq_no;
383
384 if (0 == x) {
386 ++away;
387 } else {
388 int xerrno = errno;
389 debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
391 ++send_errors;
392 assert(send_errors < 100);
393 if (shm_offset > -1)
394 shm.put(shm_offset);
395 }
396
397 /*
398 * We have to drain the queue here if necessary. If we don't,
399 * then we can have a lot of messages in the queue (probably
400 * up to 2*magic1) and we can run out of shared memory buffers.
401 */
402 /*
403 * Note that we call Store::Root().callbackk (for all SDs), rather
404 * than callback for just this SD, so that while
405 * we're "blocking" on this SD we can also handle callbacks
406 * from other SDs that might be ready.
407 */
408
409 struct timeval delay = {0, 1};
410
411 while (away > magic2) {
412 select(0, nullptr, nullptr, nullptr, &delay);
414
415 if (delay.tv_usec < 1000000)
416 delay.tv_usec <<= 1;
417 }
418
419 return x;
420}
421
424{
428 return result;
429}
430
431bool
432DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
433{
434 if (strcmp(name, "Q1") != 0)
435 return false;
436
437 int old_magic1 = magic1;
438
439 magic1 = atoi(value);
440
441 if (!isaReconfig)
442 return true;
443
444 if (old_magic1 < magic1) {
445 /*
446 * This is because shm.nbufs is computed at startup, when
447 * we call shmget(). We can't increase the Q1/Q2 parameters
448 * beyond their initial values because then we might have
449 * more "Q2 messages" than shared memory chunks, and this
450 * will cause an assertion in storeDiskdShmGet().
451 */
452 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
453 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
454 magic1 = old_magic1;
455 return true;
456 }
457
458 if (old_magic1 != magic1)
459 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
460
461 return true;
462}
463
464void
466{
467 storeAppendPrintf(e, " Q1=%d", magic1);
468}
469
470bool
471DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
472{
473 if (strcmp(name, "Q2") != 0)
474 return false;
475
476 int old_magic2 = magic2;
477
478 magic2 = atoi(value);
479
480 if (!isaReconfig)
481 return true;
482
483 if (old_magic2 < magic2) {
484 /* See comments in Q1 function above */
485 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
486 magic2 = old_magic2;
487 return true;
488 }
489
490 if (old_magic2 != magic2)
491 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
492
493 return true;
494}
495
496void
498{
499 storeAppendPrintf(e, " Q2=%d", magic2);
500}
501
502/*
503 * Sync any pending data. We just sit around and read the queue
504 * until the data has finished writing.
505 */
506void
508{
509 static time_t lastmsg = 0;
510
511 while (away > 0) {
512 if (squid_curtime > lastmsg) {
513 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
514 lastmsg = squid_curtime;
515 }
516
517 callback();
518 }
519}
520
521/*
522 * Handle callbacks. If we have more than magic2 requests away, we block
523 * until the queue is below magic2. Otherwise, we simply return when we
524 * don't get a message.
525 */
526
527int
529{
530 diomsg M;
531 int x;
532 int retval = 0;
533
534 if (away >= magic2) {
536 retval = 1;
537 /* We might not have anything to do, but our queue
538 * is full.. */
539 }
540
544 }
545
546 while (1) {
547 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
548
549 if (x < 0)
550 break;
551 else if (x != diomsg::msg_snd_rcv_sz) {
552 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
553 break;
554 }
555
557 --away;
558 handle(&M);
559 retval = 1; /* Return that we've actually done some work */
560
561 if (M.shm_offset > -1)
562 shm.put ((off_t) M.shm_offset);
563 }
564
565 return retval;
566}
567
568void
570{
571 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
572}
573
static void * hIpc
Definition: IcmpSquid.cc:33
static pid_t pid
Definition: IcmpSquid.cc:34
int size
Definition: ModDevPoll.cc:75
time_t squid_curtime
Definition: stub_libtime.cc:20
class SquidConfig Config
Definition: SquidConfig.cc:12
StatCounters statCounter
Definition: StatCounters.cc:12
#define assert(EX)
Definition: assert.h:17
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:265
#define cbdataReferenceDone(var)
Definition: cbdata.h:352
#define cbdataReference(var)
Definition: cbdata.h:343
std::vector< ConfigOption * > options
Definition: ConfigOption.h:74
void completed(diomsg *)
Definition: DiskdFile.cc:208
bool optionQ1Parse(char const *option, const char *value, int reconfiguring)
int SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
void init() override
void unlinkDone(diomsg *M)
ConfigOption * getOptionTree() const override
void handle(diomsg *M)
void sync() override
void optionQ2Dump(StoreEntry *e) const
static size_t newInstance()
int load() override
bool optionQ2Parse(char const *option, const char *value, int reconfiguring)
bool shedLoad() override
void optionQ1Dump(StoreEntry *e) const
int send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
int callback() override
void statfs(StoreEntry &sentry) const override
SharedMemory shm
RefCount< DiskFile > newFile(char const *path) override
static size_t nextInstanceID
void unlinkFile(char const *) override
bool unlinkdUseful() const override
void setLocalhost()
Definition: Address.cc:255
Definition: Lock.h:26
void lock() const
Definition: Lock.h:34
C * getRaw() const
Definition: RefCount.h:89
void * get(ssize_t *)
void put(ssize_t)
void init(int ikey, int magic2)
struct SquidConfig::@99 Program
char * diskd
Definition: SquidConfig.h:208
struct StatCounters::@130 syscalls
struct StatCounters::@130::@134 disk
int callback() override
called once every main loop iteration; TODO: Move to UFS code.
Definition: Controller.cc:229
int commSetNonBlocking(int fd)
Definition: comm.cc:1066
void commUnsetFdTimeout(int fd)
clear a timeout handler by FD number
Definition: comm.cc:582
#define comm_close(x)
Definition: comm.h:27
#define MYNAME
Definition: Stream.h:236
#define DBG_IMPORTANT
Definition: Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
#define DBG_CRITICAL
Definition: Stream.h:37
#define IPC_STREAM
Definition: defines.h:106
#define CBIT_SET(mask, bit)
Definition: defines.h:74
#define CBIT_CLR(mask, bit)
Definition: defines.h:75
#define CBIT_TEST(mask, bit)
Definition: defines.h:76
@ _MQD_UNLINK
Definition: diomsg.h:25
@ _MQD_CREATE
Definition: diomsg.h:21
@ _MQD_WRITE
Definition: diomsg.h:24
@ _MQD_CLOSE
Definition: diomsg.h:22
@ _MQD_OPEN
Definition: diomsg.h:20
@ _MQD_READ
Definition: diomsg.h:23
void fatal(const char *message)
Definition: fatal.cc:28
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
void fd_note(int fd, const char *s)
Definition: fd.cc:216
#define SHMBUF_BLKSZ
diskd_stats_t diskd_stats
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:65
void QuickPollRequired(void)
Definition: ModDevPoll.cc:417
Controller & Root()
safely access controller singleton
Definition: Controller.cc:938
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:841
Definition: diomsg.h:30
mtyp_t mtype
Definition: diomsg.h:31
int seq_no
Definition: diomsg.h:33
off_t offset
Definition: diomsg.h:37
int status
Definition: diomsg.h:38
size_t size
Definition: diomsg.h:36
Lock * requestor
Definition: diomsg.h:35
static const int msg_snd_rcv_sz
Definition: diomsg.h:41
bool newstyle
Definition: diomsg.h:39
int shm_offset
Definition: diomsg.h:40
void * callback_data
Definition: diomsg.h:34
int id
Definition: diomsg.h:32
struct diskd_stats_t::@42 unlink
int unsigned int
Definition: stub_fd.cc:19
long mtyp_t
Definition: types.h:141
void unlinkdUnlink(const char *path)
Definition: unlinkd.cc:39
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71
const char * xstrerr(int error)
Definition: xstrerror.cc:83
char * xstrncpy(char *dst, const char *src, size_t n)
Definition: xstring.cc:37

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors