IpcIoFile.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2019 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 47 Store Directory Routines */
10 
11 #include "squid.h"
12 #include "base/RunnersRegistry.h"
13 #include "base/TextException.h"
14 #include "DiskIO/IORequestor.h"
15 #include "DiskIO/IpcIo/IpcIoFile.h"
16 #include "DiskIO/ReadRequest.h"
17 #include "DiskIO/WriteRequest.h"
18 #include "fd.h"
19 #include "fs_io.h"
20 #include "globals.h"
21 #include "ipc/mem/Pages.h"
22 #include "ipc/Messages.h"
23 #include "ipc/Port.h"
24 #include "ipc/Queue.h"
25 #include "ipc/StrandSearch.h"
26 #include "ipc/UdsOp.h"
27 #include "sbuf/SBuf.h"
28 #include "SquidConfig.h"
29 #include "SquidTime.h"
30 #include "StatCounters.h"
31 #include "tools.h"
32 
33 #include <cerrno>
34 
36 
38 static const char *const ShmLabel = "io_file";
42 // TODO: make configurable or compute from squid.conf settings if possible
43 static const int QueueCapacity = 1024;
44 
45 const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
48 std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
49 
51 
52 static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
53 static void DiskerClose(const SBuf &path);
54 
56 struct SipcIo {
57  SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
58  worker(aWorker), msg(aMsg), disker(aDisker) {}
59 
60  int worker;
61  const IpcIoMsg &msg;
62  int disker;
63 };
64 
65 std::ostream &
66 operator <<(std::ostream &os, const SipcIo &sio)
67 {
68  return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
69  (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
70 }
71 
72 IpcIoFile::IpcIoFile(char const *aDb):
73  dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
74  olderRequests(&requestMap1), newerRequests(&requestMap2),
75  timeoutCheckScheduled(false)
76 {
77 }
78 
80 {
82  if (diskId >= 0) {
83  const auto i = IpcIoFiles.find(diskId);
84  Must(i != IpcIoFiles.end());
85  Must(i->second == this);
86  IpcIoFiles.erase(i);
87  }
88  });
89 }
90 
91 void
93 {
95  config = cfg;
96 }
97 
98 void
100 {
102  Must(diskId < 0); // we do not know our disker yet
103 
104  if (!queue.get())
106 
107  if (IamDiskProcess()) {
108  error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
109  if (error_)
110  return;
111 
113  const bool inserted =
114  IpcIoFiles.insert(std::make_pair(diskId, this)).second;
115  Must(inserted);
116 
117  queue->localRateLimit().store(config.ioRate);
118 
120  ann.strand.tag = dbName;
121  Ipc::TypedMsgHdr message;
122  ann.pack(message);
124 
126  return;
127  }
128 
130  request.requestorId = KidIdentifier;
131  request.tag = dbName;
132 
133  Ipc::TypedMsgHdr msg;
134  request.pack(msg);
136 
137  WaitingForOpen.push_back(this);
138 
139  eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
140  this, Timeout, 0, false); // "this" pointer is used as id
141 }
142 
143 void
145 {
146  Must(diskId < 0); // we do not know our disker yet
147 
148  if (!response) {
149  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
150  "channel establishment timeout");
151  error_ = true;
152  } else {
153  diskId = response->strand.kidId;
154  if (diskId >= 0) {
155  const bool inserted =
156  IpcIoFiles.insert(std::make_pair(diskId, this)).second;
157  Must(inserted);
158  } else {
159  error_ = true;
160  debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
161  "responsibility for " << dbName);
162  }
163  }
164 
166 }
167 
172 void
174 {
175  assert(false); // check
176  /* We use the same logic path for open */
177  open(flags, mode, callback);
178 }
179 
180 void
182 {
183  assert(ioRequestor != NULL);
184 
185  if (IamDiskProcess())
187  // XXX: else nothing to do?
188 
190 }
191 
192 bool
194 {
195  return diskId >= 0 && !error_ && canWait();
196 }
197 
198 bool
200 {
201  return diskId >= 0 && !error_ && canWait();
202 }
203 
204 bool
206 {
207  return error_;
208 }
209 
210 void
212 {
213  debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
214  readRequest->offset << ")");
215 
216  assert(ioRequestor != NULL);
217  assert(readRequest->offset >= 0);
218  Must(!error_);
219 
220  //assert(minOffset < 0 || minOffset <= readRequest->offset);
221  //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
222 
223  IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
224  pending->readRequest = readRequest;
225  push(pending);
226 }
227 
228 void
230  IpcIoMsg *const response)
231 {
232  bool ioError = false;
233  if (!response) {
234  debugs(79, 3, HERE << "error: timeout");
235  ioError = true; // I/O timeout does not warrant setting error_?
236  } else {
237  if (response->xerrno) {
238  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
239  xstrerr(response->xerrno));
240  ioError = error_ = true;
241  } else if (!response->page) {
242  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
243  "out of shared memory pages");
244  ioError = true;
245  } else {
246  const char *const buf = Ipc::Mem::PagePointer(response->page);
247  memcpy(readRequest->buf, buf, response->len);
248  }
249 
250  Ipc::Mem::PutPage(response->page);
251  }
252 
253  const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
254  const int errflag = ioError ? DISK_ERROR :DISK_OK;
255  ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
256 }
257 
258 void
260 {
261  debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
262  writeRequest->offset << ")");
263 
264  assert(ioRequestor != NULL);
265  assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
266  assert(writeRequest->offset >= 0);
267  Must(!error_);
268 
269  //assert(minOffset < 0 || minOffset <= writeRequest->offset);
270  //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
271 
272  IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
273  pending->writeRequest = writeRequest;
274  push(pending);
275 }
276 
277 void
279  const IpcIoMsg *const response)
280 {
281  bool ioError = false;
282  if (!response) {
283  debugs(79, 3, "disker " << diskId << " timeout");
284  ioError = true; // I/O timeout does not warrant setting error_?
285  } else if (response->xerrno) {
286  debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
287  " error writing " << writeRequest->len << " bytes at " <<
288  writeRequest->offset << ": " << xstrerr(response->xerrno) <<
289  "; this worker will stop using " << dbName);
290  ioError = error_ = true;
291  } else if (response->len != writeRequest->len) {
292  debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
293  response->len << " instead of " << writeRequest->len <<
294  " bytes (offset " << writeRequest->offset << "); " <<
295  "this worker will stop using " << dbName);
296  error_ = true;
297  }
298 
299  if (writeRequest->free_func)
300  (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
301 
302  if (!ioError) {
303  debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
304  diskId << " at " << writeRequest->offset);
305  }
306 
307  const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
308  const int errflag = ioError ? DISK_ERROR :DISK_OK;
309  ioRequestor->writeCompleted(errflag, rlen, writeRequest);
310 }
311 
312 bool
314 {
315  return !olderRequests->empty() || !newerRequests->empty();
316 }
317 
319 void
320 IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
321 {
322  const std::pair<RequestMap::iterator,bool> result =
323  newerRequests->insert(std::make_pair(id, pending));
324  Must(result.second); // failures means that id was not unique
327 }
328 
330 void
332 {
333  // prevent queue overflows: check for responses to earlier requests
334  // warning: this call may result in indirect push() recursion
335  HandleResponses("before push");
336 
337  debugs(47, 7, HERE);
338  Must(diskId >= 0);
339  Must(pending);
340  Must(pending->readRequest || pending->writeRequest);
341 
342  IpcIoMsg ipcIo;
343  try {
344  if (++lastRequestId == 0) // don't use zero value as requestId
345  ++lastRequestId;
346  ipcIo.requestId = lastRequestId;
347  ipcIo.start = current_time;
348  if (pending->readRequest) {
349  ipcIo.command = IpcIo::cmdRead;
350  ipcIo.offset = pending->readRequest->offset;
351  ipcIo.len = pending->readRequest->len;
352  } else { // pending->writeRequest
353  Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
355  ipcIo.len = 0;
356  throw TexcHere("run out of shared memory pages for IPC I/O");
357  }
358  ipcIo.command = IpcIo::cmdWrite;
359  ipcIo.offset = pending->writeRequest->offset;
360  ipcIo.len = pending->writeRequest->len;
361  char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
362  memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
363  }
364 
365  debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
366 
367  // protect DiskerHandleRequest() from pop queue overflow
370 
371  if (queue->push(diskId, ipcIo))
372  Notify(diskId); // must notify disker
373  trackPendingRequest(ipcIo.requestId, pending);
374  } catch (const Queue::Full &) {
375  debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
376  dbName << " overflow: " <<
377  SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
378  // TODO: grow queue size
379  if (ipcIo.page)
380  Ipc::Mem::PutPage(ipcIo.page);
381 
382  pending->completeIo(NULL);
383  delete pending;
384  } catch (const TextException &e) {
385  debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
386  pending->completeIo(NULL);
387  delete pending;
388  }
389 }
390 
392 bool
394 {
395  if (!config.ioTimeout)
396  return true; // no timeout specified
397 
398  IpcIoMsg oldestIo;
399  if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
400  return true; // we cannot estimate expected wait time; assume it is OK
401 
402  const int oldestWait = tvSubMsec(oldestIo.start, current_time);
403 
404  int rateWait = -1; // time in millisecons
405  const int ioRate = queue->rateLimit(diskId).load();
406  if (ioRate > 0) {
407  // if there are N requests pending, the new one will wait at
408  // least N/max-swap-rate seconds
409  rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
410  // adjust N/max-swap-rate value based on the queue "balance"
411  // member, in case we have been borrowing time against future
412  // I/O already
413  rateWait += queue->balance(diskId);
414  }
415 
416  const int expectedWait = max(oldestWait, rateWait);
417  if (expectedWait < 0 ||
418  static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
419  return true; // expected wait time is acceptible
420 
421  debugs(47,2, HERE << "cannot wait: " << expectedWait <<
422  " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
423  return false; // do not want to wait that long
424 }
425 
427 void
429 {
430  debugs(47, 7, HERE << "coordinator response to open request");
431  for (IpcIoFileList::iterator i = WaitingForOpen.begin();
432  i != WaitingForOpen.end(); ++i) {
433  if (response.strand.tag == (*i)->dbName) {
434  (*i)->openCompleted(&response);
435  WaitingForOpen.erase(i);
436  return;
437  }
438  }
439 
440  debugs(47, 4, HERE << "LATE disker response to open for " <<
441  response.strand.tag);
442  // nothing we can do about it; completeIo() has been called already
443 }
444 
445 void
446 IpcIoFile::HandleResponses(const char *const when)
447 {
448  debugs(47, 4, HERE << "popping all " << when);
449  IpcIoMsg ipcIo;
450  // get all responses we can: since we are not pushing, this will stop
451  int diskId;
452  while (queue->pop(diskId, ipcIo)) {
453  const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
454  Must(i != IpcIoFiles.end()); // TODO: warn but continue
455  i->second->handleResponse(ipcIo);
456  }
457 }
458 
459 void
461 {
462  const int requestId = ipcIo.requestId;
463  debugs(47, 7, HERE << "popped disker response: " <<
464  SipcIo(KidIdentifier, ipcIo, diskId));
465 
466  Must(requestId);
467  if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
468  pending->completeIo(&ipcIo);
469  delete pending; // XXX: leaking if throwing
470  } else {
471  debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
472  "; ipcIo" << KidIdentifier << '.' << requestId);
473  // nothing we can do about it; completeIo() has been called already
474  }
475 }
476 
477 void
478 IpcIoFile::Notify(const int peerId)
479 {
480  // TODO: Count and report the total number of notifications, pops, pushes.
481  debugs(47, 7, HERE << "kid" << peerId);
482  Ipc::TypedMsgHdr msg;
483  msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
484  msg.putInt(KidIdentifier);
486  Ipc::SendMessage(addr, msg);
487 }
488 
489 void
491 {
492  const int from = msg.getInt();
493  debugs(47, 7, HERE << "from " << from);
494  queue->clearReaderSignal(from);
495  if (IamDiskProcess())
497  else
498  HandleResponses("after notification");
499 }
500 
502 void
503 IpcIoFile::OpenTimeout(void *const param)
504 {
505  Must(param);
506  // the pointer is used for comparison only and not dereferenced
507  const IpcIoFile *const ipcIoFile =
508  reinterpret_cast<const IpcIoFile *>(param);
509  for (IpcIoFileList::iterator i = WaitingForOpen.begin();
510  i != WaitingForOpen.end(); ++i) {
511  if (*i == ipcIoFile) {
512  (*i)->openCompleted(NULL);
513  WaitingForOpen.erase(i);
514  break;
515  }
516  }
517 }
518 
520 void
521 IpcIoFile::CheckTimeouts(void *const param)
522 {
523  Must(param);
524  const int diskId = reinterpret_cast<uintptr_t>(param);
525  debugs(47, 7, HERE << "diskId=" << diskId);
526  const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
527  if (i != IpcIoFiles.end())
528  i->second->checkTimeouts();
529 }
530 
531 void
533 {
534  timeoutCheckScheduled = false;
535 
536  // last chance to recover in case a notification message was lost, etc.
537  const RequestMap::size_type timeoutsBefore = olderRequests->size();
538  HandleResponses("before timeout");
539  const RequestMap::size_type timeoutsNow = olderRequests->size();
540 
541  if (timeoutsBefore > timeoutsNow) { // some requests were rescued
542  // notification message lost or significantly delayed?
543  debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
544  " may be too slow or disrupted for about " <<
545  Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
546  " out of " << timeoutsBefore << " I/Os");
547  }
548 
549  if (timeoutsNow) {
550  debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
551  timeoutsNow << ' ' << dbName << " I/Os after at least " <<
552  Timeout << "s timeout");
553  }
554 
555  // any old request would have timed out by now
556  typedef RequestMap::const_iterator RMCI;
557  for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
558  IpcIoPendingRequest *const pending = i->second;
559 
560  const unsigned int requestId = i->first;
561  debugs(47, 7, HERE << "disker timeout; ipcIo" <<
562  KidIdentifier << '.' << requestId);
563 
564  pending->completeIo(NULL); // no response
565  delete pending; // XXX: leaking if throwing
566  }
567  olderRequests->clear();
568 
569  swap(olderRequests, newerRequests); // switches pointers around
570  if (!olderRequests->empty() && !timeoutCheckScheduled)
572 }
573 
575 void
577 {
578  // we check all older requests at once so some may be wait for 2*Timeout
579  eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
580  reinterpret_cast<void *>(diskId), Timeout, 0, false);
581  timeoutCheckScheduled = true;
582 }
583 
586 IpcIoFile::dequeueRequest(const unsigned int requestId)
587 {
588  Must(requestId != 0);
589 
590  RequestMap *map = NULL;
591  RequestMap::iterator i = requestMap1.find(requestId);
592 
593  if (i != requestMap1.end())
594  map = &requestMap1;
595  else {
596  i = requestMap2.find(requestId);
597  if (i != requestMap2.end())
598  map = &requestMap2;
599  }
600 
601  if (!map) // not found in both maps
602  return NULL;
603 
604  IpcIoPendingRequest *pending = i->second;
605  map->erase(i);
606  return pending;
607 }
608 
609 int
611 {
612  assert(false); // not supported; TODO: remove this method from API
613  return -1;
614 }
615 
616 /* IpcIoMsg */
617 
619  requestId(0),
620  offset(0),
621  len(0),
622  command(IpcIo::cmdNone),
623  xerrno(0)
624 {
625  start.tv_sec = 0;
626  start.tv_usec = 0;
627 }
628 
629 /* IpcIoPendingRequest */
630 
632  file(aFile), readRequest(NULL), writeRequest(NULL)
633 {
634 }
635 
636 void
638 {
639  if (readRequest)
640  file->readCompleted(readRequest, response);
641  else if (writeRequest)
642  file->writeCompleted(writeRequest, response);
643  else {
644  Must(!response); // only timeouts are handled here
645  file->openCompleted(NULL);
646  }
647 }
648 
649 /* XXX: disker code that should probably be moved elsewhere */
650 
651 static SBuf DbName;
652 static int TheFile = -1;
653 
654 static void
656 {
658  ipcIo.len = 0;
659  debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
660  return;
661  }
662 
663  char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
664  const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
665  ++statCounter.syscalls.disk.reads;
666  fd_bytes(TheFile, read, FD_READ);
667 
668  if (read >= 0) {
669  ipcIo.xerrno = 0;
670  const size_t len = static_cast<size_t>(read); // safe because read > 0
671  debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
672  (len == ipcIo.len ? "all " : "just ") << read);
673  ipcIo.len = len;
674  } else {
675  ipcIo.xerrno = errno;
676  ipcIo.len = 0;
677  debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
678  ipcIo.xerrno);
679  }
680 }
681 
684 static void
686 {
687  const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
688  size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
689  size_t wroteSoFar = 0;
690  off_t offset = ipcIo.offset;
691  // Partial writes to disk do happen. It is unlikely that the caller can
692  // handle partial writes by doing something other than writing leftovers
693  // again, so we try to write them ourselves to minimize overheads.
694  const int attemptLimit = 10;
695  for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
696  const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
697  ++statCounter.syscalls.disk.writes;
698  fd_bytes(TheFile, result, FD_WRITE);
699 
700  if (result < 0) {
701  ipcIo.xerrno = errno;
702  assert(ipcIo.xerrno);
703  debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
704  " writing " << toWrite << '/' << ipcIo.len <<
705  " at " << ipcIo.offset << '+' << wroteSoFar <<
706  " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
707  ipcIo.len = wroteSoFar;
708  return; // bail on error
709  }
710 
711  const size_t wroteNow = static_cast<size_t>(result); // result >= 0
712  ipcIo.xerrno = 0;
713 
714  debugs(47,3, "disker" << KidIdentifier << " wrote " <<
715  (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
716  " out of " << toWrite << '/' << ipcIo.len << " at " <<
717  ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
718  " try");
719 
720  wroteSoFar += wroteNow;
721 
722  if (wroteNow >= toWrite) {
723  ipcIo.xerrno = 0;
724  ipcIo.len = wroteSoFar;
725  return; // wrote everything there was to write
726  }
727 
728  buf += wroteNow;
729  offset += wroteNow;
730  toWrite -= wroteNow;
731  }
732 
733  debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
734  attemptLimit << " attempts while writing " <<
735  toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
736  wroteSoFar);
737  return; // not a fatal I/O error, unless the caller treats it as such
738 }
739 
740 static void
742 {
743  diskerWriteAttempts(ipcIo); // may fail
744  Ipc::Mem::PutPage(ipcIo.page);
745 }
746 
747 void
749 {
750  debugs(47, 7, HERE << "resuming handling requests after " <<
751  static_cast<const char *>(source));
752  DiskerHandleMoreRequestsScheduled = false;
754 }
755 
756 bool
758 {
759  const int ioRate = queue->localRateLimit().load();
760  const double maxRate = ioRate/1e3; // req/ms
761 
762  // do we need to enforce configured I/O rate?
763  if (maxRate <= 0)
764  return false;
765 
766  // is there an I/O request we could potentially delay?
767  int processId;
768  IpcIoMsg ipcIo;
769  if (!queue->peek(processId, ipcIo)) {
770  // unlike pop(), peek() is not reliable and does not block reader
771  // so we must proceed with pop() even if it is likely to fail
772  return false;
773  }
774 
775  static timeval LastIo = current_time;
776 
777  const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
778  // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
779  const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
780 
781  const double credit = ioDuration; // what the last I/O should have cost us
782  const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
783  LastIo = current_time;
784 
785  Ipc::QueueReader::Balance &balance = queue->localBalance();
786  balance += static_cast<int64_t>(credit - debit);
787 
788  debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
789 
790  if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
791  // if the next request is (likely) write and we accumulated
792  // too much time for future slow I/Os, then shed accumulated
793  // time to keep just half of the excess
794  const int64_t toSpend = balance - maxImbalance/2;
795 
796  if (toSpend/1e3 > Timeout)
797  debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
798  "I/O requests for " << (toSpend/1e3) << " seconds " <<
799  "to obey " << ioRate << "/sec rate limit");
800 
801  debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
802  (1e3*maxRate) << "/sec rate");
803  eventAdd("IpcIoFile::DiskerHandleMoreRequests",
805  const_cast<char*>("rate limiting"),
806  toSpend/1e3, 0, false);
807  DiskerHandleMoreRequestsScheduled = true;
808  return true;
809  } else if (balance < -maxImbalance) {
810  // do not owe "too much" to avoid "too large" bursts of I/O
811  balance = -maxImbalance;
812  }
813 
814  return false;
815 }
816 
817 void
819 {
820  // Balance our desire to maximize the number of concurrent I/O requests
821  // (reordred by OS to minimize seek time) with a requirement to
822  // send 1st-I/O notification messages, process Coordinator events, etc.
823  const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
824  const timeval loopStart = current_time;
825 
826  int popped = 0;
827  int workerId = 0;
828  IpcIoMsg ipcIo;
829  while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
830  ++popped;
831 
832  // at least one I/O per call is guaranteed if the queue is not empty
833  DiskerHandleRequest(workerId, ipcIo);
834 
835  getCurrentTime();
836  const double elapsedMsec = tvSubMsec(loopStart, current_time);
837  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
838  if (!DiskerHandleMoreRequestsScheduled) {
839  // the gap must be positive for select(2) to be given a chance
840  const double minBreakSecs = 0.001;
841  eventAdd("IpcIoFile::DiskerHandleMoreRequests",
843  const_cast<char*>("long I/O loop"),
844  minBreakSecs, 0, false);
845  DiskerHandleMoreRequestsScheduled = true;
846  }
847  debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
848  elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
849  break;
850  }
851  }
852 
853  // TODO: consider using O_DIRECT with "elevator" optimization where we pop
854  // requests first, then reorder the popped requests to optimize seek time,
855  // then do I/O, then take a break, and come back for the next set of I/O
856  // requests.
857 }
858 
860 void
861 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
862 {
863  if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
864  debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
865  " should not receive " << ipcIo.command <<
866  " ipcIo" << workerId << '.' << ipcIo.requestId);
867  return;
868  }
869 
870  debugs(47,5, HERE << "disker" << KidIdentifier <<
871  (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
872  ipcIo.len << " at " << ipcIo.offset <<
873  " ipcIo" << workerId << '.' << ipcIo.requestId);
874 
875  if (ipcIo.command == IpcIo::cmdRead)
876  diskerRead(ipcIo);
877  else // ipcIo.command == IpcIo::cmdWrite
878  diskerWrite(ipcIo);
879 
880  debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
881 
882  try {
883  if (queue->push(workerId, ipcIo))
884  Notify(workerId); // must notify worker
885  } catch (const Queue::Full &) {
886  // The worker pop queue should not overflow because the worker can
887  // push only if pendingRequests() is less than QueueCapacity.
888  debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
889  DbName << " overflow: " <<
890  SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
891 
892  // the I/O request we could not push will timeout
893  }
894 }
895 
896 static bool
897 DiskerOpen(const SBuf &path, int flags, mode_t)
898 {
899  assert(TheFile < 0);
900 
901  DbName = path;
902  TheFile = file_open(DbName.c_str(), flags);
903 
904  if (TheFile < 0) {
905  const int xerrno = errno;
906  debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
907  xstrerr(xerrno));
908  return false;
909  }
910 
912  debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
913  return true;
914 }
915 
916 static void
917 DiskerClose(const SBuf &path)
918 {
919  if (TheFile >= 0) {
921  debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
922  TheFile = -1;
924  }
925  DbName.clear();
926 }
927 
931 {
932 public:
933  /* RegisteredRunner API */
934  IpcIoRr(): owner(NULL) {}
935  virtual ~IpcIoRr();
936  virtual void claimMemoryNeeds();
937 
938 protected:
939  /* Ipc::Mem::RegisteredRunner API */
940  virtual void create();
941 
942 private:
944 };
945 
947 
948 void
950 {
951  const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
953  // the maximum number of shared I/O pages is approximately the
954  // number of queue slots, we add a fudge factor to that to account
955  // for corner cases where I/O pages are created before queue
956  // limits are checked or destroyed long after the I/O is dequeued
958  static_cast<int>(itemsCount * 1.1));
959 }
960 
961 void
963 {
964  if (Config.cacheSwap.n_strands <= 0)
965  return;
966 
967  Must(!owner);
970  1 + Config.workers, sizeof(IpcIoMsg),
971  QueueCapacity);
972 }
973 
975 {
976  delete owner;
977 }
978 
virtual void create()
called when the runner should create a new memory segment
Definition: IpcIoFile.cc:962
virtual bool canWrite() const
Definition: IpcIoFile.cc:199
strand registration with Coordinator (also used as an ACK)
Definition: StrandCoord.h:36
int diskId
the process ID of the disker we talk to
Definition: IpcIoFile.h:124
generally useful configuration options supported by some children
Definition: DiskFile.h:27
RequestMap * olderRequests
older requests (map1 or map2)
Definition: IpcIoFile.h:135
virtual void configure(const Config &)
notes supported configuration options; kids must call this first
Definition: DiskFile.h:42
StatCounters statCounter
Definition: StatCounters.cc:12
#define assert(EX)
Definition: assert.h:17
FREE * free_func
Definition: WriteRequest.h:28
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandSearch.cc:28
static void HandleOpenResponse(const Ipc::StrandSearchResponse &response)
handle open response from coordinator
Definition: IpcIoFile.cc:428
void const char HLPCB * callback
Definition: stub_helper.cc:16
virtual void open(int flags, mode_t mode, RefCount< IORequestor > callback)
Definition: IpcIoFile.cc:99
StrandCoord strand
registrant coordinates and related details
Definition: StrandCoord.h:44
keeps original I/O request parameters while disker is handling the request
Definition: IpcIoFile.h:156
int requestorId
sender-provided return address
Definition: StrandSearch.h:28
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile)
Definition: IpcIoFile.cc:631
bool error_
whether we have seen at least one I/O error (XXX)
Definition: IpcIoFile.h:127
String tag
set when looking for a matching StrandCoord::tag
Definition: StrandSearch.h:29
std::list< Pointer > IpcIoFileList
Definition: IpcIoFile.h:141
static void diskerRead(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:655
virtual void create(int flags, mode_t mode, RefCount< IORequestor > callback)
Definition: IpcIoFile.cc:173
IpcIo::Command command
what disker is supposed to do or did
Definition: IpcIoFile.h:50
Definition: SBuf.h:86
Strand location details.
Definition: StrandCoord.h:19
virtual ~IpcIoRr()
Definition: IpcIoFile.cc:974
RefCount< IORequestor > ioRequestor
Definition: IpcIoFile.h:125
char * PagePointer(const PageId &page)
converts page handler into a temporary writeable shared memory pointer
Definition: Pages.cc:48
void putInt(int n)
store an integer
Definition: TypedMsgHdr.cc:126
size_t len
Definition: ReadRequest.h:26
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
int i
Definition: membanger.c:49
static void CheckTimeouts(void *const param)
IpcIoFile::checkTimeouts wrapper.
Definition: IpcIoFile.cc:521
static void Notify(const int peerId)
Definition: IpcIoFile.cc:478
std::ostream & operator<<(std::ostream &os, const SipcIo &sio)
Definition: IpcIoFile.cc:66
virtual void write(WriteRequest *)
Definition: IpcIoFile.cc:259
static void HandleResponses(const char *const when)
Definition: IpcIoFile.cc:446
StrandCoord strand
answer matching StrandSearchRequest criteria
Definition: StrandSearch.h:41
void scheduleTimeoutCheck()
prepare to check for timeouts in a little while
Definition: IpcIoFile.cc:576
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
called when disker receives an I/O request
Definition: IpcIoFile.cc:861
int kidId
internal Squid process number
Definition: StrandCoord.h:29
int file_open(const char *path, int mode)
Definition: fs_io.cc:46
IpcIoFile(char const *aDb)
Definition: IpcIoFile.cc:72
void clear()
Definition: SBuf.cc:178
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
virtual void configure(const Config &cfg)
notes supported configuration options; kids must call this first
Definition: IpcIoFile.cc:92
void NotePageNeed(const int purpose, const int count)
claim the need for a number of pages for a given purpose
Definition: Pages.cc:72
std::map< unsigned int, IpcIoPendingRequest * > RequestMap
maps requestId to the handleResponse callback
Definition: IpcIoFile.h:132
char * buf
Definition: ReadRequest.h:24
Definition: enums.h:24
bool GetPage(const PageId::Purpose purpose, PageId &page)
sets page ID and returns true unless no free pages are found
Definition: Pages.cc:34
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response)
Definition: IpcIoFile.cc:278
#define DBG_CRITICAL
Definition: Debug.h:45
bool timeoutCheckScheduled
we expect a CheckTimeouts() call
Definition: IpcIoFile.h:137
static IpcIoFilesMap IpcIoFiles
Definition: IpcIoFile.h:146
struct timeval current_time
Definition: stub_time.cc:15
DiskFile::Config config
supported configuration options
Definition: IpcIoFile.h:89
A const & max(A const &lhs, A const &rhs)
Definition: enums.h:23
asynchronous strand search request
Definition: StrandSearch.h:20
int ioRate
shape I/O request stream to approach that many per second
Definition: DiskFile.h:36
bool IamWorkerProcess()
whether the current process handles HTTP transactions and such
Definition: stub_tools.cc:49
#define DISK_ERROR
Definition: defines.h:40
virtual const char * what() const override
virtual void claimMemoryNeeds()
Definition: IpcIoFile.cc:949
RunnerRegistrationEntry(IpcIoRr)
void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response)
Definition: IpcIoFile.cc:229
#define SWALLOW_EXCEPTIONS(code)
Definition: TextException.h:73
static bool DiskerHandleMoreRequestsScheduled
whether we are waiting for an event to handle still queued I/O requests
Definition: IpcIoFile.h:152
void fd_bytes(int fd, int len, unsigned int type)
Definition: fd.cc:260
WriteRequest * writeRequest
set if this is a write request
Definition: IpcIoFile.h:167
unsigned int lastRequestId
last requestId used
Definition: IpcIoFile.h:129
std::map< int, IpcIoFile * > IpcIoFilesMap
Definition: IpcIoFile.h:145
virtual bool ioInProgress() const
Definition: IpcIoFile.cc:313
int disker
Definition: IpcIoFile.cc:62
int tvSubMsec(struct timeval, struct timeval)
Definition: stub_time.cc:20
const char * xstrerr(int error)
Definition: xstrerror.cc:83
static void DiskerHandleRequests()
Definition: IpcIoFile.cc:818
void checkTimeouts()
Definition: IpcIoFile.cc:532
off_t offset
Definition: ReadRequest.h:25
int n_strands
number of disk processes required to support all cache_dirs
Definition: SquidConfig.h:66
static const char *const ShmLabel
shared memory segment path to use for IpcIoFile maps
Definition: IpcIoFile.cc:38
static SBuf DbName
full db file name
Definition: IpcIoFile.cc:651
static std::unique_ptr< Queue > queue
IPC queue.
Definition: IpcIoFile.h:149
int getInt() const
load an integer
Definition: TypedMsgHdr.cc:118
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:124
struct StatCounters::@136 syscalls
#define DBG_IMPORTANT
Definition: Debug.h:46
friend class IpcIoPendingRequest
Definition: IpcIoFile.h:92
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:51
struct StatCounters::@136::@139 disk
RequestMap requestMap1
older (or newer) pending requests
Definition: IpcIoFile.h:133
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:202
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
Definition: IpcIoFile.cc:490
virtual void closeCompleted()=0
static void DiskerClose(const SBuf &path)
Definition: IpcIoFile.cc:917
void * addr
Definition: membanger.c:46
int xerrno
I/O error code or zero.
Definition: IpcIoFile.h:53
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandCoord.cc:51
char const * buf
Definition: WriteRequest.h:25
void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
track a new pending request
Definition: IpcIoFile.cc:320
const char * c_str()
Definition: SBuf.cc:526
virtual int getFD() const
Definition: IpcIoFile.cc:610
virtual void read(ReadRequest *)
Definition: IpcIoFile.cc:211
static void diskerWrite(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:741
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
char const * termedBuf() const
Definition: SquidString.h:91
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
asynchronous strand search response
Definition: StrandSearch.h:33
static bool WaitBeforePop()
Definition: IpcIoFile.cc:757
void const char * buf
Definition: stub_helper.cc:16
virtual bool error() const
Definition: IpcIoFile.cc:205
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:153
virtual bool canRead() const
Definition: IpcIoFile.cc:193
static void DiskerHandleMoreRequests(void *)
Definition: IpcIoFile.cc:748
ReadRequest * readRequest
set if this is a read requests
Definition: IpcIoFile.h:166
static const int QueueCapacity
Definition: IpcIoFile.cc:43
SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker)
Definition: IpcIoFile.cc:57
an std::runtime_error with thrower location info
Definition: TextException.h:19
unsigned short mode_t
Definition: types.h:150
const char strandAddrLabel[]
strand&#39;s listening address unique label
Definition: Port.cc:22
static CTCB Timeout
Definition: Ident.cc:66
Ipc::Mem::PageId page
Definition: IpcIoFile.h:48
const IpcIoFile::Pointer file
the file object waiting for the response
Definition: IpcIoFile.h:165
size_t PageSize()
returns page size in bytes; all pages are assumed to be the same size
Definition: Pages.cc:28
time_t getCurrentTime(void)
Get current time.
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:109
int KidIdentifier
virtual ~IpcIoFile()
Definition: IpcIoFile.cc:79
void completeIo(IpcIoMsg *const response)
called when response is received and, with a nil response, on timeouts
Definition: IpcIoFile.cc:637
virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ReadRequest >)=0
RequestMap requestMap2
newer (or older) pending requests
Definition: IpcIoFile.h:134
Ipc::FewToFewBiQueue::Owner * owner
Definition: IpcIoFile.cc:943
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:31
Store::DiskConfig cacheSwap
Definition: SquidConfig.h:422
IpcIo wrapper for debugs() streams; XXX: find a better class name.
Definition: IpcIoFile.cc:56
int store_open_disk_fd
RequestMap * newerRequests
newer requests (map2 or map1)
Definition: IpcIoFile.h:136
void push(IpcIoPendingRequest *const pending)
push an I/O request to disker
Definition: IpcIoFile.cc:331
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:221
size_t pendingRequests() const
Definition: IpcIoFile.h:105
bool canWait() const
whether we think there is enough time to complete the I/O
Definition: IpcIoFile.cc:393
size_t len
Definition: IpcIoFile.h:47
#define TexcHere(msg)
legacy convenience macro; it is not difficult to type Here() now
Definition: TextException.h:55
void handleResponse(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:460
void PutPage(PageId &page)
makes identified page available as a free page to future GetPage() callers
Definition: Pages.cc:41
time_msec_t ioTimeout
canRead/Write should return false if expected I/O delay exceeds it
Definition: DiskFile.h:33
int worker
Definition: IpcIoFile.cc:60
static int TheFile
db file descriptor
Definition: IpcIoFile.cc:652
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:32
static const double Timeout
timeout value in seconds
Definition: IpcIoFile.h:139
const String dbName
the name of the file we are managing
Definition: IpcIoFile.h:123
void setType(int aType)
sets message type; use MessageType enum
Definition: TypedMsgHdr.cc:107
unsigned int requestId
unique for requestor; matches request w/ response
Definition: IpcIoFile.h:44
bool IamDiskProcess() STUB_RETVAL_NOP(false) bool InDaemonMode() STUB_RETVAL_NOP(false) bool UsingSmp() STUB_RETVAL_NOP(false) bool IamCoordinatorProcess() STUB_RETVAL(false) bool IamPrimaryProcess() STUB_RETVAL(false) int NumberOfKids() STUB_RETVAL(0) void setMaxFD(void) STUB void setSystemLimits(void) STUB void squid_signal(int
whether the current process is dedicated to managing a cache_dir
#define DISK_OK
Definition: defines.h:39
void openCompleted(const Ipc::StrandSearchResponse *const response)
Definition: IpcIoFile.cc:144
static IpcIoFileList WaitingForOpen
pending open requests
Definition: IpcIoFile.h:142
const IpcIoMsg & msg
Definition: IpcIoFile.cc:61
off_t offset
Definition: IpcIoFile.h:46
AtomicSignedMsec Balance
Definition: Queue.h:58
virtual void ioCompletedNotification()=0
void file_close(int fd)
Definition: fs_io.cc:76
IpcIoPendingRequest * dequeueRequest(const unsigned int requestId)
returns and forgets the right IpcIoFile pending request
Definition: IpcIoFile.cc:586
CBDATA_CLASS_INIT(IpcIoFile)
virtual void writeCompleted(int errflag, size_t len, RefCount< WriteRequest >)=0
static void OpenTimeout(void *const param)
handles open request timeout
Definition: IpcIoFile.cc:503
class SquidConfig Config
Definition: SquidConfig.cc:12
static String CoordinatorAddr()
get the IPC message address for coordinator process
Definition: Port.cc:64
#define NULL
Definition: types.h:166
A const & min(A const &lhs, A const &rhs)
virtual void close()
Definition: IpcIoFile.cc:181
#define false
Definition: GnuRegex.c:233
converts DiskIO requests to IPC queue messages
Definition: IpcIoFile.h:38
struct timeval start
when the I/O request was converted to IpcIoMsg
Definition: IpcIoFile.h:51
static bool DiskerOpen(const SBuf &path, int flags, mode_t mode)
Definition: IpcIoFile.cc:897
static void diskerWriteAttempts(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:685

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors