IpcIoFile.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 47 Store Directory Routines */
10
11#include "squid.h"
12#include "base/AsyncFunCalls.h"
13#include "base/CodeContext.h"
15#include "base/TextException.h"
16#include "DiskIO/IORequestor.h"
18#include "DiskIO/ReadRequest.h"
19#include "DiskIO/WriteRequest.h"
20#include "fd.h"
21#include "fs_io.h"
22#include "globals.h"
23#include "ipc/mem/Pages.h"
24#include "ipc/Messages.h"
25#include "ipc/Port.h"
26#include "ipc/Queue.h"
27#include "ipc/StrandCoord.h"
28#include "ipc/StrandSearch.h"
29#include "ipc/UdsOp.h"
30#include "sbuf/SBuf.h"
31#include "SquidConfig.h"
32#include "StatCounters.h"
33#include "tools.h"
34
35#include <cerrno>
36
38
40static const char *const ShmLabel = "io_file";
44// TODO: make configurable or compute from squid.conf settings if possible
45static const int QueueCapacity = 1024;
46
47const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
50std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
51
53
54static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
55static void DiskerClose(const SBuf &path);
56
58struct SipcIo {
59 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
60 worker(aWorker), msg(aMsg), disker(aDisker) {}
61
62 int worker;
63 const IpcIoMsg &msg;
64 int disker;
65};
66
67static std::ostream &
68operator <<(std::ostream &os, const SipcIo &sio)
69{
70 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
71 sio.msg.command << sio.disker;
72}
73
74/* IpcIo::Command */
75
76std::ostream &
77IpcIo::operator <<(std::ostream &os, const Command command)
78{
79 switch (command) {
80 case cmdNone:
81 return os << '-';
82 case cmdOpen:
83 return os << 'o';
84 case cmdRead:
85 return os << 'r';
86 case cmdWrite:
87 return os << 'w';
88 }
89 // unreachable code
90 return os << static_cast<int>(command);
91}
92
93/* IpcIoFile */
94
95IpcIoFile::IpcIoFile(char const *aDb):
96 dbName(aDb),
97 myPid(getpid()),
98 diskId(-1),
99 error_(false),
100 lastRequestId(0),
101 olderRequests(&requestMap1), newerRequests(&requestMap2),
102 timeoutCheckScheduled(false)
103{
104 assert(myPid >= 0);
105}
106
108{
110 if (diskId >= 0) {
111 const auto i = IpcIoFiles.find(diskId);
112 Must(i != IpcIoFiles.end());
113 Must(i->second == this);
114 IpcIoFiles.erase(i);
115 }
116 });
117}
118
119void
121{
123 config = cfg;
124}
125
126void
128{
129 ioRequestor = callback;
130 Must(diskId < 0); // we do not know our disker yet
131
132 if (!queue.get()) {
134 AsyncCall::Pointer call = asyncCall(79, 4, "IpcIoFile::HandleMessagesAtStart",
136 ScheduleCallHere(call);
137 }
138
139 if (IamDiskProcess()) {
140 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
141 if (error_)
142 return;
143
145 const bool inserted =
146 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
147 Must(inserted);
148
149 queue->localRateLimit().store(config.ioRate);
150
152
154 return;
155 }
156
157 const Ipc::StrandSearchRequest request(dbName);
159 request.pack(msg);
161
162 WaitingForOpen.push_back(this);
163
164 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
165 this, Timeout, 0, false); // "this" pointer is used as id
166}
167
168void
170{
171 Must(diskId < 0); // we do not know our disker yet
172
173 if (!response) {
174 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
175 "channel establishment timeout");
176 error_ = true;
177 } else {
178 diskId = response->strand.kidId;
179 if (diskId >= 0) {
180 const bool inserted =
181 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
182 Must(inserted);
183 } else {
184 error_ = true;
185 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
186 "responsibility for " << dbName);
187 }
188 }
189
191}
192
197void
199{
200 assert(false); // check
201 /* We use the same logic path for open */
202 open(flags, mode, callback);
203}
204
205void
207{
208 assert(ioRequestor != nullptr);
209
210 if (IamDiskProcess())
212 // XXX: else nothing to do?
213
215}
216
217bool
219{
220 return diskId >= 0 && !error_ && canWait();
221}
222
223bool
225{
226 return diskId >= 0 && !error_ && canWait();
227}
228
229bool
231{
232 return error_;
233}
234
235void
237{
238 debugs(79,3, "(disker" << diskId << ", " << readRequest->len << ", " <<
239 readRequest->offset << ")");
240
241 assert(ioRequestor != nullptr);
242 assert(readRequest->offset >= 0);
243 Must(!error_);
244
245 //assert(minOffset < 0 || minOffset <= readRequest->offset);
246 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
247
248 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
249 pending->readRequest = readRequest;
250 push(pending);
251}
252
253void
255 IpcIoMsg *const response)
256{
257 bool ioError = false;
258 if (!response) {
259 debugs(79, 3, "error: timeout");
260 ioError = true; // I/O timeout does not warrant setting error_?
261 } else {
262 if (response->xerrno) {
263 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
264 xstrerr(response->xerrno));
265 ioError = error_ = true;
266 } else if (!response->page) {
267 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
268 "out of shared memory pages");
269 ioError = true;
270 } else {
271 const char *const buf = Ipc::Mem::PagePointer(response->page);
272 memcpy(readRequest->buf, buf, response->len);
273 }
274
275 Ipc::Mem::PutPage(response->page);
276 }
277
278 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
279 const int errflag = ioError ? DISK_ERROR :DISK_OK;
280 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
281}
282
283void
285{
286 debugs(79,3, "(disker" << diskId << ", " << writeRequest->len << ", " <<
287 writeRequest->offset << ")");
288
289 assert(ioRequestor != nullptr);
290 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
291 assert(writeRequest->offset >= 0);
292 Must(!error_);
293
294 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
295 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
296
297 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
298 pending->writeRequest = writeRequest;
299 push(pending);
300}
301
302void
304 const IpcIoMsg *const response)
305{
306 bool ioError = false;
307 if (!response) {
308 debugs(79, 3, "disker " << diskId << " timeout");
309 ioError = true; // I/O timeout does not warrant setting error_?
310 } else if (response->xerrno) {
311 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
312 " error writing " << writeRequest->len << " bytes at " <<
313 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
314 "; this worker will stop using " << dbName);
315 ioError = error_ = true;
316 } else if (response->len != writeRequest->len) {
317 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
318 response->len << " instead of " << writeRequest->len <<
319 " bytes (offset " << writeRequest->offset << "); " <<
320 "this worker will stop using " << dbName);
321 error_ = true;
322 }
323
324 if (writeRequest->free_func)
325 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
326
327 if (!ioError) {
328 debugs(79,5, "wrote " << writeRequest->len << " to disker" <<
329 diskId << " at " << writeRequest->offset);
330 }
331
332 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
333 const int errflag = ioError ? DISK_ERROR :DISK_OK;
334 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
335}
336
337bool
339{
340 return !olderRequests->empty() || !newerRequests->empty();
341}
342
344void
345IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
346{
347 const std::pair<RequestMap::iterator,bool> result =
348 newerRequests->insert(std::make_pair(id, pending));
349 Must(result.second); // failures means that id was not unique
352}
353
355void
357{
358 // prevent queue overflows: check for responses to earlier requests
359 // warning: this call may result in indirect push() recursion
360 CallService(nullptr, [] {
361 HandleResponses("before push");
362 });
363
364 debugs(47, 7, MYNAME);
365 Must(diskId >= 0);
366 Must(pending);
367 Must(pending->readRequest || pending->writeRequest);
368
369 IpcIoMsg ipcIo;
370 try {
371 if (++lastRequestId == 0) // don't use zero value as requestId
373 ipcIo.requestId = lastRequestId;
374 ipcIo.start = current_time;
375 ipcIo.workerPid = myPid;
376 if (pending->readRequest) {
377 ipcIo.command = IpcIo::cmdRead;
378 ipcIo.offset = pending->readRequest->offset;
379 ipcIo.len = pending->readRequest->len;
380 } else { // pending->writeRequest
381 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
383 ipcIo.len = 0;
384 throw TexcHere("run out of shared memory pages for IPC I/O");
385 }
386 ipcIo.command = IpcIo::cmdWrite;
387 ipcIo.offset = pending->writeRequest->offset;
388 ipcIo.len = pending->writeRequest->len;
389 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
390 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
391 }
392
393 debugs(47, 7, "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
394
395 // protect DiskerHandleRequest() from pop queue overflow
398
399 if (queue->push(diskId, ipcIo))
400 Notify(diskId); // must notify disker
401 trackPendingRequest(ipcIo.requestId, pending);
402 } catch (const Queue::Full &) {
403 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
404 dbName << " overflow: " <<
405 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
406 // TODO: grow queue size
407 if (ipcIo.page)
408 Ipc::Mem::PutPage(ipcIo.page);
409
410 pending->completeIo(nullptr);
411 delete pending;
412 } catch (const TextException &e) {
413 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
414 pending->completeIo(nullptr);
415 delete pending;
416 }
417}
418
420bool
422{
423 if (!config.ioTimeout)
424 return true; // no timeout specified
425
426 IpcIoMsg oldestIo;
427 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
428 return true; // we cannot estimate expected wait time; assume it is OK
429
430 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
431
432 int rateWait = -1; // time in millisecons
433 const int ioRate = queue->rateLimit(diskId).load();
434 if (ioRate > 0) {
435 // if there are N requests pending, the new one will wait at
436 // least N/max-swap-rate seconds
437 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
438 // adjust N/max-swap-rate value based on the queue "balance"
439 // member, in case we have been borrowing time against future
440 // I/O already
441 rateWait += queue->balance(diskId);
442 }
443
444 const int expectedWait = max(oldestWait, rateWait);
445 if (expectedWait < 0 ||
446 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
447 return true; // expected wait time is acceptable
448
449 debugs(47,2, "cannot wait: " << expectedWait <<
450 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
451 return false; // do not want to wait that long
452}
453
455void
457{
458 debugs(47, 7, "coordinator response to open request");
459 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
460 i != WaitingForOpen.end(); ++i) {
461 if (response.strand.tag == (*i)->dbName) {
462 (*i)->openCompleted(&response);
463 WaitingForOpen.erase(i);
464 return;
465 }
466 }
467
468 debugs(47, 4, "LATE disker response to open for " <<
469 response.strand.tag);
470 // nothing we can do about it; completeIo() has been called already
471}
472
473void
474IpcIoFile::HandleResponses(const char *const when)
475{
476 debugs(47, 4, "popping all " << when);
477 IpcIoMsg ipcIo;
478 // get all responses we can: since we are not pushing, this will stop
479 int diskId;
480 while (queue->pop(diskId, ipcIo)) {
481 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
482 if (i == IpcIoFiles.end()) {
483 debugs(47, 5, "ignoring disk response " << SipcIo(KidIdentifier, ipcIo, diskId) << ": the file is not open");
484 continue;
485 }
486 i->second->handleResponse(ipcIo);
487 }
488}
489
490void
492{
493 const int requestId = ipcIo.requestId;
494
495 Must(requestId);
496 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
497 CallBack(pending->codeContext, [&] {
498 debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
499 if (myPid == ipcIo.workerPid)
500 pending->completeIo(&ipcIo);
501 else
502 debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo.workerPid);
503 delete pending; // XXX: leaking if throwing
504 });
505 } else {
506 debugs(47, 4, "LATE disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
507 // nothing we can do about it; completeIo() has been called already
508 }
509}
510
511void
512IpcIoFile::Notify(const int peerId)
513{
514 // TODO: Count and report the total number of notifications, pops, pushes.
515 debugs(47, 7, "kid" << peerId);
517 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
519 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
520 Ipc::SendMessage(addr, msg);
521}
522
523void
525{
526 const int from = msg.getInt();
527 debugs(47, 7, "from " << from);
528 queue->clearReaderSignal(from);
529 if (IamDiskProcess())
531 else
532 HandleResponses("after notification");
533}
534
536void
538{
540 queue->clearAllReaderSignals();
541 if (IamDiskProcess())
543 else
544 HandleResponses("at start");
545}
546
547void
548IpcIoFile::StatQueue(std::ostream &os)
549{
550 if (queue.get()) {
551 os << "SMP disk I/O queues:\n";
552 queue->stat<IpcIoMsg>(os);
553 }
554}
555
557void
558IpcIoFile::OpenTimeout(void *const param)
559{
560 Must(param);
561 // the pointer is used for comparison only and not dereferenced
562 const IpcIoFile *const ipcIoFile =
563 reinterpret_cast<const IpcIoFile *>(param);
564 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
565 i != WaitingForOpen.end(); ++i) {
566 if (*i == ipcIoFile) {
567 (*i)->openCompleted(nullptr);
568 WaitingForOpen.erase(i);
569 break;
570 }
571 }
572}
573
575void
576IpcIoFile::CheckTimeouts(void *const param)
577{
578 Must(param);
579 const int diskId = reinterpret_cast<uintptr_t>(param);
580 debugs(47, 7, "diskId=" << diskId);
581 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
582 if (i != IpcIoFiles.end())
583 i->second->checkTimeouts();
584}
585
586void
588{
589 timeoutCheckScheduled = false;
590
591 // last chance to recover in case a notification message was lost, etc.
592 const RequestMap::size_type timeoutsBefore = olderRequests->size();
593 HandleResponses("before timeout");
594 const RequestMap::size_type timeoutsNow = olderRequests->size();
595
596 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
597 // notification message lost or significantly delayed?
598 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
599 " may be too slow or disrupted for about " <<
600 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
601 " out of " << timeoutsBefore << " I/Os");
602 }
603
604 if (timeoutsNow) {
605 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
606 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
607 Timeout << "s timeout");
608 }
609
610 // any old request would have timed out by now
611 typedef RequestMap::const_iterator RMCI;
612 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
613 IpcIoPendingRequest *const pending = i->second;
614 CallBack(pending->codeContext, [&] {
615 const auto requestId = i->first;
616 debugs(47, 7, "disker timeout; ipcIo" << KidIdentifier << '.' << requestId);
617 pending->completeIo(nullptr); // no response
618 delete pending; // XXX: leaking if throwing
619 });
620 }
621 olderRequests->clear();
622
623 swap(olderRequests, newerRequests); // switches pointers around
624 if (!olderRequests->empty() && !timeoutCheckScheduled)
626}
627
629void
631{
632 // We may be running in an I/O requestor CodeContext, but are scheduling
633 // one-for-all CheckTimeouts() that is not specific to any request.
634 CallService(nullptr, [&] {
635 // we check all older requests at once so some may be wait for 2*Timeout
636 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
637 reinterpret_cast<void *>(diskId), Timeout, 0, false);
639 });
640}
641
644IpcIoFile::dequeueRequest(const unsigned int requestId)
645{
646 Must(requestId != 0);
647
648 RequestMap *map = nullptr;
649 RequestMap::iterator i = requestMap1.find(requestId);
650
651 if (i != requestMap1.end())
652 map = &requestMap1;
653 else {
654 i = requestMap2.find(requestId);
655 if (i != requestMap2.end())
656 map = &requestMap2;
657 }
658
659 if (!map) // not found in both maps
660 return nullptr;
661
662 IpcIoPendingRequest *pending = i->second;
663 map->erase(i);
664 return pending;
665}
666
667int
669{
670 assert(false); // not supported; TODO: remove this method from API
671 return -1;
672}
673
674/* IpcIoMsg */
675
677 requestId(0),
678 offset(0),
679 len(0),
680 workerPid(-1), // Unix-like systems use process IDs starting from 0
681 command(IpcIo::cmdNone),
682 xerrno(0)
683{
684 start.tv_sec = 0;
685 start.tv_usec = 0;
686}
687
688void
689IpcIoMsg::stat(std::ostream &os)
690{
691 timeval elapsedTime;
692 tvSub(elapsedTime, start, current_time);
693 os << "id: " << requestId <<
694 ", offset: " << offset <<
695 ", size: " << len <<
696 ", workerPid: " << workerPid <<
697 ", page: " << page <<
698 ", command: " << command <<
699 ", start: " << start <<
700 ", elapsed: " << elapsedTime <<
701 ", errno: " << xerrno;
702}
703
704/* IpcIoPendingRequest */
705
707 file(aFile),
708 readRequest(nullptr),
709 writeRequest(nullptr),
710 codeContext(CodeContext::Current())
711{
712}
713
714void
716{
717 if (readRequest)
718 file->readCompleted(readRequest, response);
719 else if (writeRequest)
720 file->writeCompleted(writeRequest, response);
721 else {
722 Must(!response); // only timeouts are handled here
723 file->openCompleted(nullptr);
724 }
725}
726
727/* XXX: disker code that should probably be moved elsewhere */
728
729static SBuf DbName;
730static int TheFile = -1;
731
732static void
734{
736 ipcIo.len = 0;
737 debugs(47,2, "run out of shared memory pages for IPC I/O");
738 return;
739 }
740
741 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
742 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
744 fd_bytes(TheFile, read, FD_READ);
745
746 if (read >= 0) {
747 ipcIo.xerrno = 0;
748 const size_t len = static_cast<size_t>(read); // safe because read > 0
749 debugs(47,8, "disker" << KidIdentifier << " read " <<
750 (len == ipcIo.len ? "all " : "just ") << read);
751 ipcIo.len = len;
752 } else {
753 ipcIo.xerrno = errno;
754 ipcIo.len = 0;
755 debugs(47,5, "disker" << KidIdentifier << " read error: " <<
756 ipcIo.xerrno);
757 }
758}
759
762static void
764{
765 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
766 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
767 size_t wroteSoFar = 0;
768 off_t offset = ipcIo.offset;
769 // Partial writes to disk do happen. It is unlikely that the caller can
770 // handle partial writes by doing something other than writing leftovers
771 // again, so we try to write them ourselves to minimize overheads.
772 const int attemptLimit = 10;
773 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
774 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
776 fd_bytes(TheFile, result, FD_WRITE);
777
778 if (result < 0) {
779 ipcIo.xerrno = errno;
780 assert(ipcIo.xerrno);
781 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
782 " writing " << toWrite << '/' << ipcIo.len <<
783 " at " << ipcIo.offset << '+' << wroteSoFar <<
784 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
785 ipcIo.len = wroteSoFar;
786 return; // bail on error
787 }
788
789 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
790 ipcIo.xerrno = 0;
791
792 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
793 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
794 " out of " << toWrite << '/' << ipcIo.len << " at " <<
795 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
796 " try");
797
798 wroteSoFar += wroteNow;
799
800 if (wroteNow >= toWrite) {
801 ipcIo.xerrno = 0;
802 ipcIo.len = wroteSoFar;
803 return; // wrote everything there was to write
804 }
805
806 buf += wroteNow;
807 offset += wroteNow;
808 toWrite -= wroteNow;
809 }
810
811 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
812 attemptLimit << " attempts while writing " <<
813 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
814 wroteSoFar);
815 return; // not a fatal I/O error, unless the caller treats it as such
816}
817
818static void
820{
821 diskerWriteAttempts(ipcIo); // may fail
822 Ipc::Mem::PutPage(ipcIo.page);
823}
824
825void
827{
828 debugs(47, 7, "resuming handling requests after " <<
829 static_cast<const char *>(source));
832}
833
834bool
836{
837 const int ioRate = queue->localRateLimit().load();
838 const double maxRate = ioRate/1e3; // req/ms
839
840 // do we need to enforce configured I/O rate?
841 if (maxRate <= 0)
842 return false;
843
844 // is there an I/O request we could potentially delay?
845 int kidId;
846 IpcIoMsg ipcIo;
847 if (!queue->peek(kidId, ipcIo)) {
848 // unlike pop(), peek() is not reliable and does not block reader
849 // so we must proceed with pop() even if it is likely to fail
850 return false;
851 }
852
853 static timeval LastIo = current_time;
854
855 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
856 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
857 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
858
859 const double credit = ioDuration; // what the last I/O should have cost us
860 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
861 LastIo = current_time;
862
863 Ipc::QueueReader::Balance &balance = queue->localBalance();
864 balance += static_cast<int64_t>(credit - debit);
865
866 debugs(47, 7, "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
867
868 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
869 // if the next request is (likely) write and we accumulated
870 // too much time for future slow I/Os, then shed accumulated
871 // time to keep just half of the excess
872 const int64_t toSpend = balance - maxImbalance/2;
873
874 if (toSpend/1e3 > Timeout)
875 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
876 "I/O requests for " << (toSpend/1e3) << " seconds " <<
877 "to obey " << ioRate << "/sec rate limit");
878
879 debugs(47, 3, "rate limiting by " << toSpend << " ms to get" <<
880 (1e3*maxRate) << "/sec rate");
881 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
883 const_cast<char*>("rate limiting"),
884 toSpend/1e3, 0, false);
886 return true;
887 } else if (balance < -maxImbalance) {
888 // do not owe "too much" to avoid "too large" bursts of I/O
889 balance = -maxImbalance;
890 }
891
892 return false;
893}
894
895void
897{
898 // Balance our desire to maximize the number of concurrent I/O requests
899 // (reordred by OS to minimize seek time) with a requirement to
900 // send 1st-I/O notification messages, process Coordinator events, etc.
901 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
902 const timeval loopStart = current_time;
903
904 int popped = 0;
905 int workerId = 0;
906 IpcIoMsg ipcIo;
907 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
908 ++popped;
909
910 // at least one I/O per call is guaranteed if the queue is not empty
911 DiskerHandleRequest(workerId, ipcIo);
912
914 const double elapsedMsec = tvSubMsec(loopStart, current_time);
915 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
917 // the gap must be positive for select(2) to be given a chance
918 const double minBreakSecs = 0.001;
919 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
921 const_cast<char*>("long I/O loop"),
922 minBreakSecs, 0, false);
924 }
925 debugs(47, 3, "pausing after " << popped << " I/Os in " <<
926 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
927 break;
928 }
929 }
930
931 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
932 // requests first, then reorder the popped requests to optimize seek time,
933 // then do I/O, then take a break, and come back for the next set of I/O
934 // requests.
935}
936
938void
939IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
940{
941 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
942 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
943 " should not receive " << ipcIo.command <<
944 " ipcIo" << workerId << '.' << ipcIo.requestId);
945 return;
946 }
947
948 debugs(47,5, "disker" << KidIdentifier <<
949 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
950 ipcIo.len << " at " << ipcIo.offset <<
951 " ipcIo" << workerId << '.' << ipcIo.requestId);
952
953 const auto workerPid = ipcIo.workerPid;
954 assert(workerPid >= 0);
955
956 if (ipcIo.command == IpcIo::cmdRead)
957 diskerRead(ipcIo);
958 else // ipcIo.command == IpcIo::cmdWrite
959 diskerWrite(ipcIo);
960
961 assert(ipcIo.workerPid == workerPid);
962
963 debugs(47, 7, "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
964
965 try {
966 if (queue->push(workerId, ipcIo))
967 Notify(workerId); // must notify worker
968 } catch (const Queue::Full &) {
969 // The worker pop queue should not overflow because the worker can
970 // push only if pendingRequests() is less than QueueCapacity.
971 debugs(47, DBG_IMPORTANT, "ERROR: Squid BUG: Worker I/O pop queue for " <<
972 DbName << " overflow: " <<
973 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
974
975 // the I/O request we could not push will timeout
976 }
977}
978
979static bool
980DiskerOpen(const SBuf &path, int flags, mode_t)
981{
982 assert(TheFile < 0);
983
984 DbName = path;
985 TheFile = file_open(DbName.c_str(), flags);
986
987 if (TheFile < 0) {
988 const int xerrno = errno;
989 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
990 xstrerr(xerrno));
991 return false;
992 }
993
995 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
996 return true;
997}
998
999static void
1000DiskerClose(const SBuf &path)
1001{
1002 if (TheFile >= 0) {
1004 debugs(79,3, "rock db closed " << path << ": FD " << TheFile);
1005 TheFile = -1;
1007 }
1008 DbName.clear();
1009}
1010
1014{
1015public:
1016 /* RegisteredRunner API */
1017 IpcIoRr(): owner(nullptr) {}
1018 ~IpcIoRr() override;
1019 void claimMemoryNeeds() override;
1020
1021protected:
1022 /* Ipc::Mem::RegisteredRunner API */
1023 void create() override;
1024
1025private:
1027};
1028
1030
1031void
1033{
1034 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
1035 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
1036 // the maximum number of shared I/O pages is approximately the
1037 // number of queue slots, we add a fudge factor to that to account
1038 // for corner cases where I/O pages are created before queue
1039 // limits are checked or destroyed long after the I/O is dequeued
1041 static_cast<int>(itemsCount * 1.1));
1042}
1043
1044void
1046{
1047 if (Config.cacheSwap.n_strands <= 0)
1048 return;
1049
1050 Must(!owner);
1053 1 + Config.workers, sizeof(IpcIoMsg),
1055}
1056
1058{
1059 delete owner;
1060}
1061
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
void CallService(const CodeContext::Pointer &serviceContext, Fun &&service)
Definition: CodeContext.h:133
void CallBack(const CodeContext::Pointer &callbackContext, Fun &&callback)
Definition: CodeContext.h:116
static const int QueueCapacity
Definition: IpcIoFile.cc:45
static const char *const ShmLabel
shared memory segment path to use for IpcIoFile maps
Definition: IpcIoFile.cc:40
static void diskerWrite(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:819
static void diskerRead(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:733
DefineRunnerRegistrator(IpcIoRr)
static bool DiskerOpen(const SBuf &path, int flags, mode_t mode)
Definition: IpcIoFile.cc:980
static void DiskerClose(const SBuf &path)
Definition: IpcIoFile.cc:1000
static int TheFile
db file descriptor
Definition: IpcIoFile.cc:730
static void diskerWriteAttempts(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:763
static std::ostream & operator<<(std::ostream &os, const SipcIo &sio)
Definition: IpcIoFile.cc:68
CBDATA_CLASS_INIT(IpcIoFile)
static SBuf DbName
full db file name
Definition: IpcIoFile.cc:729
class SquidConfig Config
Definition: SquidConfig.cc:12
StatCounters statCounter
Definition: StatCounters.cc:12
#define TexcHere(msg)
legacy convenience macro; it is not difficult to type Here() now
Definition: TextException.h:63
#define SWALLOW_EXCEPTIONS(code)
Definition: TextException.h:79
#define Must(condition)
Definition: TextException.h:75
#define assert(EX)
Definition: assert.h:17
generally useful configuration options supported by some children
Definition: DiskFile.h:28
time_msec_t ioTimeout
canRead/Write should return false if expected I/O delay exceeds it
Definition: DiskFile.h:33
int ioRate
shape I/O request stream to approach that many per second
Definition: DiskFile.h:36
virtual void configure(const Config &)
notes supported configuration options; kids must call this first
Definition: DiskFile.h:42
virtual void closeCompleted()=0
virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ReadRequest >)=0
virtual void ioCompletedNotification()=0
virtual void writeCompleted(int errflag, size_t len, RefCount< WriteRequest >)=0
RequestMap * newerRequests
newer requests (map2 or map1)
Definition: IpcIoFile.h:148
RequestMap requestMap1
older (or newer) pending requests
Definition: IpcIoFile.h:145
static void DiskerHandleMoreRequests(void *)
Definition: IpcIoFile.cc:826
void create(int flags, mode_t mode, RefCount< IORequestor > callback) override
Definition: IpcIoFile.cc:198
void scheduleTimeoutCheck()
prepare to check for timeouts in a little while
Definition: IpcIoFile.cc:630
static bool DiskerHandleMoreRequestsScheduled
whether we are waiting for an event to handle still queued I/O requests
Definition: IpcIoFile.h:164
friend class IpcIoPendingRequest
Definition: IpcIoFile.h:101
bool canWait() const
whether we think there is enough time to complete the I/O
Definition: IpcIoFile.cc:421
static IpcIoFileList WaitingForOpen
pending open requests
Definition: IpcIoFile.h:154
void read(ReadRequest *) override
Definition: IpcIoFile.cc:236
static const double Timeout
timeout value in seconds
Definition: IpcIoFile.h:151
~IpcIoFile() override
Definition: IpcIoFile.cc:107
bool canRead() const override
Definition: IpcIoFile.cc:218
static void CheckTimeouts(void *const param)
IpcIoFile::checkTimeouts wrapper.
Definition: IpcIoFile.cc:576
static IpcIoFilesMap IpcIoFiles
Definition: IpcIoFile.h:158
RequestMap * olderRequests
older requests (map1 or map2)
Definition: IpcIoFile.h:147
void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
track a new pending request
Definition: IpcIoFile.cc:345
void checkTimeouts()
Definition: IpcIoFile.cc:587
static void DiskerHandleRequests()
Definition: IpcIoFile.cc:896
void open(int flags, mode_t mode, RefCount< IORequestor > callback) override
Definition: IpcIoFile.cc:127
static void Notify(const int peerId)
Definition: IpcIoFile.cc:512
static void HandleOpenResponse(const Ipc::StrandMessage &)
handle open response from coordinator
Definition: IpcIoFile.cc:456
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response)
Definition: IpcIoFile.cc:303
RefCount< IORequestor > ioRequestor
Definition: IpcIoFile.h:137
bool ioInProgress() const override
Definition: IpcIoFile.cc:338
int getFD() const override
Definition: IpcIoFile.cc:668
void configure(const Config &cfg) override
notes supported configuration options; kids must call this first
Definition: IpcIoFile.cc:120
static void OpenTimeout(void *const param)
handles open request timeout
Definition: IpcIoFile.cc:558
void handleResponse(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:491
bool error() const override
Definition: IpcIoFile.cc:230
void close() override
Definition: IpcIoFile.cc:206
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
called when disker receives an I/O request
Definition: IpcIoFile.cc:939
void openCompleted(const Ipc::StrandMessage *)
Definition: IpcIoFile.cc:169
IpcIoFile(char const *aDb)
Definition: IpcIoFile.cc:95
int diskId
the kid ID of the disker we talk to
Definition: IpcIoFile.h:136
void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response)
Definition: IpcIoFile.cc:254
size_t pendingRequests() const
Definition: IpcIoFile.h:114
std::map< int, IpcIoFile * > IpcIoFilesMap
Definition: IpcIoFile.h:157
DiskFile::Config config
supported configuration options
Definition: IpcIoFile.h:98
void push(IpcIoPendingRequest *const pending)
push an I/O request to disker
Definition: IpcIoFile.cc:356
static void StatQueue(std::ostream &)
prints IPC message queue state; suitable for cache manager reports
Definition: IpcIoFile.cc:548
void write(WriteRequest *) override
Definition: IpcIoFile.cc:284
bool timeoutCheckScheduled
we expect a CheckTimeouts() call
Definition: IpcIoFile.h:149
unsigned int lastRequestId
last requestId used
Definition: IpcIoFile.h:141
Ipc::FewToFewBiQueue Queue
Definition: IpcIoFile.h:160
RequestMap requestMap2
newer (or older) pending requests
Definition: IpcIoFile.h:146
static std::unique_ptr< Queue > queue
IPC queue.
Definition: IpcIoFile.h:161
IpcIoPendingRequest * dequeueRequest(const unsigned int requestId)
returns and forgets the right IpcIoFile pending request
Definition: IpcIoFile.cc:644
const pid_t myPid
optimization: cached process ID of our process
Definition: IpcIoFile.h:135
bool error_
whether we have seen at least one I/O error (XXX)
Definition: IpcIoFile.h:139
bool canWrite() const override
Definition: IpcIoFile.cc:224
static void HandleMessagesAtStart()
Definition: IpcIoFile.cc:537
static void HandleResponses(const char *const when)
Definition: IpcIoFile.cc:474
const String dbName
the name of the file we are managing
Definition: IpcIoFile.h:134
std::map< unsigned int, IpcIoPendingRequest * > RequestMap
maps requestId to the handleResponse callback
Definition: IpcIoFile.h:144
static bool WaitBeforePop()
Definition: IpcIoFile.cc:835
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
Definition: IpcIoFile.cc:524
std::list< Pointer > IpcIoFileList
Definition: IpcIoFile.h:153
converts DiskIO requests to IPC queue messages
Definition: IpcIoFile.h:41
size_t len
Definition: IpcIoFile.h:52
pid_t workerPid
the process ID of the I/O requestor
Definition: IpcIoFile.h:54
IpcIo::Command command
what disker is supposed to do or did
Definition: IpcIoFile.h:56
unsigned int requestId
unique for requestor; matches request w/ response
Definition: IpcIoFile.h:49
void stat(std::ostream &)
prints message parameters; suitable for cache manager reports
Definition: IpcIoFile.cc:689
struct timeval start
when the I/O request was converted to IpcIoMsg
Definition: IpcIoFile.h:57
int xerrno
I/O error code or zero.
Definition: IpcIoFile.h:59
Ipc::Mem::PageId page
Definition: IpcIoFile.h:53
off_t offset
Definition: IpcIoFile.h:51
keeps original I/O request parameters while disker is handling the request
Definition: IpcIoFile.h:169
CodeContext::Pointer codeContext
requestor's context
Definition: IpcIoFile.h:181
const IpcIoFile::Pointer file
the file object waiting for the response
Definition: IpcIoFile.h:177
WriteRequest * writeRequest
set if this is a write request
Definition: IpcIoFile.h:179
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile)
Definition: IpcIoFile.cc:706
void completeIo(IpcIoMsg *const response)
called when response is received and, with a nil response, on timeouts
Definition: IpcIoFile.cc:715
ReadRequest * readRequest
set if this is a read requests
Definition: IpcIoFile.h:178
~IpcIoRr() override
Definition: IpcIoFile.cc:1057
Ipc::FewToFewBiQueue::Owner * owner
Definition: IpcIoFile.cc:1026
void create() override
called when the runner should create a new memory segment
Definition: IpcIoFile.cc:1045
void claimMemoryNeeds() override
Definition: IpcIoFile.cc:1032
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:247
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:228
static String CoordinatorAddr()
get the IPC message address for coordinator process
Definition: Port.cc:65
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:52
AtomicSignedMsec Balance
Definition: Queue.h:62
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:34
int kidId
internal Squid process number
Definition: StrandCoord.h:31
an IPC message carrying StrandCoord
Definition: StrandCoord.h:39
static void NotifyCoordinator(MessageType, const char *tag)
creates and sends StrandMessage to Coordinator
Definition: StrandCoord.cc:62
StrandCoord strand
messageType-specific coordinates (e.g., sender)
Definition: StrandCoord.h:52
asynchronous strand search request
Definition: StrandSearch.h:22
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandSearch.cc:33
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:35
void putInt(int n)
store an integer
Definition: TypedMsgHdr.cc:119
void setType(int aType)
sets message type; use MessageType enum
Definition: TypedMsgHdr.cc:100
int getInt() const
load an integer
Definition: TypedMsgHdr.cc:111
Calls a function without arguments. See also: NullaryMemFunT.
Definition: AsyncFunCalls.h:18
size_t len
Definition: ReadRequest.h:26
off_t offset
Definition: ReadRequest.h:25
char * buf
Definition: ReadRequest.h:24
Definition: SBuf.h:94
const char * c_str()
Definition: SBuf.cc:516
void clear()
Definition: SBuf.cc:175
Store::DiskConfig cacheSwap
Definition: SquidConfig.h:423
struct StatCounters::@130 syscalls
struct StatCounters::@130::@134 disk
int n_strands
number of disk processes required to support all cache_dirs
Definition: SquidConfig.h:72
char const * termedBuf() const
Definition: SquidString.h:92
an std::runtime_error with thrower location info
Definition: TextException.h:21
const char * what() const override
FREE * free_func
Definition: WriteRequest.h:28
char const * buf
Definition: WriteRequest.h:25
A const & max(A const &lhs, A const &rhs)
A const & min(A const &lhs, A const &rhs)
#define MYNAME
Definition: Stream.h:236
#define DBG_IMPORTANT
Definition: Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
#define DBG_CRITICAL
Definition: Stream.h:37
#define DISK_ERROR
Definition: defines.h:28
#define DISK_OK
Definition: defines.h:27
@ FD_WRITE
Definition: enums.h:24
@ FD_READ
Definition: enums.h:23
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
void fd_bytes(int fd, int len, unsigned int type)
Definition: fd.cc:226
int file_open(const char *path, int mode)
Definition: fs_io.cc:45
void file_close(int fd)
Definition: fs_io.cc:73
int store_open_disk_fd
int KidIdentifier
Command
what kind of I/O the disker needs to do or have done
Definition: IpcIoFile.h:33
@ cmdNone
Definition: IpcIoFile.h:33
@ cmdRead
Definition: IpcIoFile.h:33
@ cmdWrite
Definition: IpcIoFile.h:33
@ cmdOpen
Definition: IpcIoFile.h:33
std::ostream & operator<<(std::ostream &, Command)
Definition: IpcIoFile.cc:77
bool GetPage(const PageId::Purpose purpose, PageId &page)
sets page ID and returns true unless no free pages are found
Definition: Pages.cc:34
size_t PageSize()
returns page size in bytes; all pages are assumed to be the same size
Definition: Pages.cc:28
void NotePageNeed(const int purpose, const int count)
claim the need for a number of pages for a given purpose
Definition: Pages.cc:72
void PutPage(PageId &page)
makes identified page available as a free page to future GetPage() callers
Definition: Pages.cc:41
char * PagePointer(const PageId &page)
converts page handler into a temporary writeable shared memory pointer
Definition: Pages.cc:48
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:23
@ mtRegisterStrand
notifies about our strand existence
Definition: Messages.h:22
@ mtIpcIoNotification
Definition: Messages.h:31
IpcIo wrapper for debugs() streams; XXX: find a better class name.
Definition: IpcIoFile.cc:58
int worker
Definition: IpcIoFile.cc:62
SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker)
Definition: IpcIoFile.cc:59
const IpcIoMsg & msg
Definition: IpcIoFile.cc:63
int disker
Definition: IpcIoFile.cc:64
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
bool IamWorkerProcess()
whether the current process handles HTTP transactions and such
Definition: stub_tools.cc:47
bool IamDiskProcess() STUB_RETVAL_NOP(false) bool InDaemonMode() STUB_RETVAL_NOP(false) bool UsingSmp() STUB_RETVAL_NOP(false) bool IamCoordinatorProcess() STUB_RETVAL(false) bool IamPrimaryProcess() STUB_RETVAL(false) int NumberOfKids() STUB_RETVAL(0) void setMaxFD(void) STUB void setSystemLimits(void) STUB void squid_signal(int
whether the current process is dedicated to managing a cache_dir
void tvSub(struct timeval &res, struct timeval const &t1, struct timeval const &t2)
Definition: gadgets.cc:58
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:17
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
uint64_t time_msec_t
Definition: gadgets.h:16
unsigned short mode_t
Definition: types.h:129
const char * xstrerr(int error)
Definition: xstrerror.cc:83

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors