IpcIoFile.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 47 Store Directory Routines */
10 
11 #include "squid.h"
12 #include "base/RunnersRegistry.h"
13 #include "base/TextException.h"
14 #include "DiskIO/IORequestor.h"
15 #include "DiskIO/IpcIo/IpcIoFile.h"
16 #include "DiskIO/ReadRequest.h"
17 #include "DiskIO/WriteRequest.h"
18 #include "fd.h"
19 #include "fs_io.h"
20 #include "globals.h"
21 #include "ipc/mem/Pages.h"
22 #include "ipc/Messages.h"
23 #include "ipc/Port.h"
24 #include "ipc/Queue.h"
25 #include "ipc/StrandSearch.h"
26 #include "ipc/UdsOp.h"
27 #include "sbuf/SBuf.h"
28 #include "SquidConfig.h"
29 #include "SquidTime.h"
30 #include "StatCounters.h"
31 #include "tools.h"
32 
33 #include <cerrno>
34 
36 
38 static const char *const ShmLabel = "io_file";
42 // TODO: make configurable or compute from squid.conf settings if possible
43 static const int QueueCapacity = 1024;
44 
45 const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
48 std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
49 
51 
52 static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
53 static void DiskerClose(const SBuf &path);
54 
56 struct SipcIo {
57  SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
58  worker(aWorker), msg(aMsg), disker(aDisker) {}
59 
60  int worker;
61  const IpcIoMsg &msg;
62  int disker;
63 };
64 
65 std::ostream &
66 operator <<(std::ostream &os, const SipcIo &sio)
67 {
68  return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
69  (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
70 }
71 
72 IpcIoFile::IpcIoFile(char const *aDb):
73  dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
74  olderRequests(&requestMap1), newerRequests(&requestMap2),
75  timeoutCheckScheduled(false)
76 {
77 }
78 
80 {
82  if (diskId >= 0) {
83  const auto i = IpcIoFiles.find(diskId);
84  Must(i != IpcIoFiles.end());
85  Must(i->second == this);
86  IpcIoFiles.erase(i);
87  }
88  });
89 }
90 
91 void
93 {
95  config = cfg;
96 }
97 
98 void
100 {
102  Must(diskId < 0); // we do not know our disker yet
103 
104  if (!queue.get())
106 
107  if (IamDiskProcess()) {
108  error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
109  if (error_)
110  return;
111 
113  const bool inserted =
114  IpcIoFiles.insert(std::make_pair(diskId, this)).second;
115  Must(inserted);
116 
117  queue->localRateLimit().store(config.ioRate);
118 
120  ann.strand.tag = dbName;
121  Ipc::TypedMsgHdr message;
122  ann.pack(message);
124 
126  return;
127  }
128 
130  request.requestorId = KidIdentifier;
131  request.tag = dbName;
132 
133  Ipc::TypedMsgHdr msg;
134  request.pack(msg);
136 
137  WaitingForOpen.push_back(this);
138 
139  eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
140  this, Timeout, 0, false); // "this" pointer is used as id
141 }
142 
143 void
145 {
146  Must(diskId < 0); // we do not know our disker yet
147 
148  if (!response) {
149  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
150  "channel establishment timeout");
151  error_ = true;
152  } else {
153  diskId = response->strand.kidId;
154  if (diskId >= 0) {
155  const bool inserted =
156  IpcIoFiles.insert(std::make_pair(diskId, this)).second;
157  Must(inserted);
158  } else {
159  error_ = true;
160  debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
161  "responsibility for " << dbName);
162  }
163  }
164 
166 }
167 
172 void
174 {
175  assert(false); // check
176  /* We use the same logic path for open */
177  open(flags, mode, callback);
178 }
179 
180 void
182 {
183  assert(ioRequestor != NULL);
184 
185  if (IamDiskProcess())
187  // XXX: else nothing to do?
188 
190 }
191 
192 bool
194 {
195  return diskId >= 0 && !error_ && canWait();
196 }
197 
198 bool
200 {
201  return diskId >= 0 && !error_ && canWait();
202 }
203 
204 bool
206 {
207  return error_;
208 }
209 
210 void
212 {
213  debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
214  readRequest->offset << ")");
215 
216  assert(ioRequestor != NULL);
217  assert(readRequest->offset >= 0);
218  Must(!error_);
219 
220  //assert(minOffset < 0 || minOffset <= readRequest->offset);
221  //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
222 
223  IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
224  pending->readRequest = readRequest;
225  push(pending);
226 }
227 
228 void
230  IpcIoMsg *const response)
231 {
232  bool ioError = false;
233  if (!response) {
234  debugs(79, 3, HERE << "error: timeout");
235  ioError = true; // I/O timeout does not warrant setting error_?
236  } else {
237  if (response->xerrno) {
238  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
239  xstrerr(response->xerrno));
240  ioError = error_ = true;
241  } else if (!response->page) {
242  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
243  "out of shared memory pages");
244  ioError = true;
245  } else {
246  const char *const buf = Ipc::Mem::PagePointer(response->page);
247  memcpy(readRequest->buf, buf, response->len);
248  }
249 
250  Ipc::Mem::PutPage(response->page);
251  }
252 
253  const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
254  const int errflag = ioError ? DISK_ERROR :DISK_OK;
255  ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
256 }
257 
258 void
260 {
261  debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
262  writeRequest->offset << ")");
263 
264  assert(ioRequestor != NULL);
265  assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
266  assert(writeRequest->offset >= 0);
267  Must(!error_);
268 
269  //assert(minOffset < 0 || minOffset <= writeRequest->offset);
270  //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
271 
272  IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
273  pending->writeRequest = writeRequest;
274  push(pending);
275 }
276 
277 void
279  const IpcIoMsg *const response)
280 {
281  bool ioError = false;
282  if (!response) {
283  debugs(79, 3, "disker " << diskId << " timeout");
284  ioError = true; // I/O timeout does not warrant setting error_?
285  } else if (response->xerrno) {
286  debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
287  " error writing " << writeRequest->len << " bytes at " <<
288  writeRequest->offset << ": " << xstrerr(response->xerrno) <<
289  "; this worker will stop using " << dbName);
290  ioError = error_ = true;
291  } else if (response->len != writeRequest->len) {
292  debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
293  response->len << " instead of " << writeRequest->len <<
294  " bytes (offset " << writeRequest->offset << "); " <<
295  "this worker will stop using " << dbName);
296  error_ = true;
297  }
298 
299  if (writeRequest->free_func)
300  (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
301 
302  if (!ioError) {
303  debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
304  diskId << " at " << writeRequest->offset);
305  }
306 
307  const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
308  const int errflag = ioError ? DISK_ERROR :DISK_OK;
309  ioRequestor->writeCompleted(errflag, rlen, writeRequest);
310 }
311 
312 bool
314 {
315  return !olderRequests->empty() || !newerRequests->empty();
316 }
317 
319 void
320 IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
321 {
322  const std::pair<RequestMap::iterator,bool> result =
323  newerRequests->insert(std::make_pair(id, pending));
324  Must(result.second); // failures means that id was not unique
327 }
328 
330 void
332 {
333  // prevent queue overflows: check for responses to earlier requests
334  // warning: this call may result in indirect push() recursion
335  HandleResponses("before push");
336 
337  debugs(47, 7, HERE);
338  Must(diskId >= 0);
339  Must(pending);
340  Must(pending->readRequest || pending->writeRequest);
341 
342  IpcIoMsg ipcIo;
343  try {
344  if (++lastRequestId == 0) // don't use zero value as requestId
345  ++lastRequestId;
346  ipcIo.requestId = lastRequestId;
347  ipcIo.start = current_time;
348  if (pending->readRequest) {
349  ipcIo.command = IpcIo::cmdRead;
350  ipcIo.offset = pending->readRequest->offset;
351  ipcIo.len = pending->readRequest->len;
352  } else { // pending->writeRequest
353  Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
355  ipcIo.len = 0;
356  throw TexcHere("run out of shared memory pages for IPC I/O");
357  }
358  ipcIo.command = IpcIo::cmdWrite;
359  ipcIo.offset = pending->writeRequest->offset;
360  ipcIo.len = pending->writeRequest->len;
361  char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
362  memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
363  }
364 
365  debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
366 
367  if (queue->push(diskId, ipcIo))
368  Notify(diskId); // must notify disker
369  trackPendingRequest(ipcIo.requestId, pending);
370  } catch (const Queue::Full &) {
371  debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
372  dbName << " overflow: " <<
373  SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
374  // TODO: grow queue size
375  if (ipcIo.page)
376  Ipc::Mem::PutPage(ipcIo.page);
377 
378  pending->completeIo(NULL);
379  delete pending;
380  } catch (const TextException &e) {
381  debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
382  pending->completeIo(NULL);
383  delete pending;
384  }
385 }
386 
388 bool
390 {
391  if (!config.ioTimeout)
392  return true; // no timeout specified
393 
394  IpcIoMsg oldestIo;
395  if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
396  return true; // we cannot estimate expected wait time; assume it is OK
397 
398  const int oldestWait = tvSubMsec(oldestIo.start, current_time);
399 
400  int rateWait = -1; // time in millisecons
401  const int ioRate = queue->rateLimit(diskId).load();
402  if (ioRate > 0) {
403  // if there are N requests pending, the new one will wait at
404  // least N/max-swap-rate seconds
405  rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
406  // adjust N/max-swap-rate value based on the queue "balance"
407  // member, in case we have been borrowing time against future
408  // I/O already
409  rateWait += queue->balance(diskId);
410  }
411 
412  const int expectedWait = max(oldestWait, rateWait);
413  if (expectedWait < 0 ||
414  static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
415  return true; // expected wait time is acceptible
416 
417  debugs(47,2, HERE << "cannot wait: " << expectedWait <<
418  " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
419  return false; // do not want to wait that long
420 }
421 
423 void
425 {
426  debugs(47, 7, HERE << "coordinator response to open request");
427  for (IpcIoFileList::iterator i = WaitingForOpen.begin();
428  i != WaitingForOpen.end(); ++i) {
429  if (response.strand.tag == (*i)->dbName) {
430  (*i)->openCompleted(&response);
431  WaitingForOpen.erase(i);
432  return;
433  }
434  }
435 
436  debugs(47, 4, HERE << "LATE disker response to open for " <<
437  response.strand.tag);
438  // nothing we can do about it; completeIo() has been called already
439 }
440 
441 void
442 IpcIoFile::HandleResponses(const char *const when)
443 {
444  debugs(47, 4, HERE << "popping all " << when);
445  IpcIoMsg ipcIo;
446  // get all responses we can: since we are not pushing, this will stop
447  int diskId;
448  while (queue->pop(diskId, ipcIo)) {
449  const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
450  Must(i != IpcIoFiles.end()); // TODO: warn but continue
451  i->second->handleResponse(ipcIo);
452  }
453 }
454 
455 void
457 {
458  const int requestId = ipcIo.requestId;
459  debugs(47, 7, HERE << "popped disker response: " <<
460  SipcIo(KidIdentifier, ipcIo, diskId));
461 
462  Must(requestId);
463  if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
464  pending->completeIo(&ipcIo);
465  delete pending; // XXX: leaking if throwing
466  } else {
467  debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
468  "; ipcIo" << KidIdentifier << '.' << requestId);
469  // nothing we can do about it; completeIo() has been called already
470  }
471 }
472 
473 void
474 IpcIoFile::Notify(const int peerId)
475 {
476  // TODO: Count and report the total number of notifications, pops, pushes.
477  debugs(47, 7, HERE << "kid" << peerId);
478  Ipc::TypedMsgHdr msg;
479  msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
480  msg.putInt(KidIdentifier);
482  Ipc::SendMessage(addr, msg);
483 }
484 
485 void
487 {
488  const int from = msg.getInt();
489  debugs(47, 7, HERE << "from " << from);
490  queue->clearReaderSignal(from);
491  if (IamDiskProcess())
493  else
494  HandleResponses("after notification");
495 }
496 
498 void
499 IpcIoFile::OpenTimeout(void *const param)
500 {
501  Must(param);
502  // the pointer is used for comparison only and not dereferenced
503  const IpcIoFile *const ipcIoFile =
504  reinterpret_cast<const IpcIoFile *>(param);
505  for (IpcIoFileList::iterator i = WaitingForOpen.begin();
506  i != WaitingForOpen.end(); ++i) {
507  if (*i == ipcIoFile) {
508  (*i)->openCompleted(NULL);
509  WaitingForOpen.erase(i);
510  break;
511  }
512  }
513 }
514 
516 void
517 IpcIoFile::CheckTimeouts(void *const param)
518 {
519  Must(param);
520  const int diskId = reinterpret_cast<uintptr_t>(param);
521  debugs(47, 7, HERE << "diskId=" << diskId);
522  const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
523  if (i != IpcIoFiles.end())
524  i->second->checkTimeouts();
525 }
526 
527 void
529 {
530  timeoutCheckScheduled = false;
531 
532  // last chance to recover in case a notification message was lost, etc.
533  const RequestMap::size_type timeoutsBefore = olderRequests->size();
534  HandleResponses("before timeout");
535  const RequestMap::size_type timeoutsNow = olderRequests->size();
536 
537  if (timeoutsBefore > timeoutsNow) { // some requests were rescued
538  // notification message lost or significantly delayed?
539  debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
540  " may be too slow or disrupted for about " <<
541  Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
542  " out of " << timeoutsBefore << " I/Os");
543  }
544 
545  if (timeoutsNow) {
546  debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
547  timeoutsNow << ' ' << dbName << " I/Os after at least " <<
548  Timeout << "s timeout");
549  }
550 
551  // any old request would have timed out by now
552  typedef RequestMap::const_iterator RMCI;
553  for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
554  IpcIoPendingRequest *const pending = i->second;
555 
556  const unsigned int requestId = i->first;
557  debugs(47, 7, HERE << "disker timeout; ipcIo" <<
558  KidIdentifier << '.' << requestId);
559 
560  pending->completeIo(NULL); // no response
561  delete pending; // XXX: leaking if throwing
562  }
563  olderRequests->clear();
564 
565  swap(olderRequests, newerRequests); // switches pointers around
566  if (!olderRequests->empty() && !timeoutCheckScheduled)
568 }
569 
571 void
573 {
574  // we check all older requests at once so some may be wait for 2*Timeout
575  eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
576  reinterpret_cast<void *>(diskId), Timeout, 0, false);
577  timeoutCheckScheduled = true;
578 }
579 
582 IpcIoFile::dequeueRequest(const unsigned int requestId)
583 {
584  Must(requestId != 0);
585 
586  RequestMap *map = NULL;
587  RequestMap::iterator i = requestMap1.find(requestId);
588 
589  if (i != requestMap1.end())
590  map = &requestMap1;
591  else {
592  i = requestMap2.find(requestId);
593  if (i != requestMap2.end())
594  map = &requestMap2;
595  }
596 
597  if (!map) // not found in both maps
598  return NULL;
599 
600  IpcIoPendingRequest *pending = i->second;
601  map->erase(i);
602  return pending;
603 }
604 
605 int
607 {
608  assert(false); // not supported; TODO: remove this method from API
609  return -1;
610 }
611 
612 /* IpcIoMsg */
613 
615  requestId(0),
616  offset(0),
617  len(0),
618  command(IpcIo::cmdNone),
619  xerrno(0)
620 {
621  start.tv_sec = 0;
622  start.tv_usec = 0;
623 }
624 
625 /* IpcIoPendingRequest */
626 
628  file(aFile), readRequest(NULL), writeRequest(NULL)
629 {
630 }
631 
632 void
634 {
635  if (readRequest)
636  file->readCompleted(readRequest, response);
637  else if (writeRequest)
638  file->writeCompleted(writeRequest, response);
639  else {
640  Must(!response); // only timeouts are handled here
641  file->openCompleted(NULL);
642  }
643 }
644 
645 /* XXX: disker code that should probably be moved elsewhere */
646 
647 static SBuf DbName;
648 static int TheFile = -1;
649 
650 static void
652 {
654  ipcIo.len = 0;
655  debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
656  return;
657  }
658 
659  char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
660  const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
661  ++statCounter.syscalls.disk.reads;
662  fd_bytes(TheFile, read, FD_READ);
663 
664  if (read >= 0) {
665  ipcIo.xerrno = 0;
666  const size_t len = static_cast<size_t>(read); // safe because read > 0
667  debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
668  (len == ipcIo.len ? "all " : "just ") << read);
669  ipcIo.len = len;
670  } else {
671  ipcIo.xerrno = errno;
672  ipcIo.len = 0;
673  debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
674  ipcIo.xerrno);
675  }
676 }
677 
680 static void
682 {
683  const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
684  size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
685  size_t wroteSoFar = 0;
686  off_t offset = ipcIo.offset;
687  // Partial writes to disk do happen. It is unlikely that the caller can
688  // handle partial writes by doing something other than writing leftovers
689  // again, so we try to write them ourselves to minimize overheads.
690  const int attemptLimit = 10;
691  for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
692  const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
693  ++statCounter.syscalls.disk.writes;
694  fd_bytes(TheFile, result, FD_WRITE);
695 
696  if (result < 0) {
697  ipcIo.xerrno = errno;
698  assert(ipcIo.xerrno);
699  debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
700  " writing " << toWrite << '/' << ipcIo.len <<
701  " at " << ipcIo.offset << '+' << wroteSoFar <<
702  " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
703  ipcIo.len = wroteSoFar;
704  return; // bail on error
705  }
706 
707  const size_t wroteNow = static_cast<size_t>(result); // result >= 0
708  ipcIo.xerrno = 0;
709 
710  debugs(47,3, "disker" << KidIdentifier << " wrote " <<
711  (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
712  " out of " << toWrite << '/' << ipcIo.len << " at " <<
713  ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
714  " try");
715 
716  wroteSoFar += wroteNow;
717 
718  if (wroteNow >= toWrite) {
719  ipcIo.xerrno = 0;
720  ipcIo.len = wroteSoFar;
721  return; // wrote everything there was to write
722  }
723 
724  buf += wroteNow;
725  offset += wroteNow;
726  toWrite -= wroteNow;
727  }
728 
729  debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
730  attemptLimit << " attempts while writing " <<
731  toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
732  wroteSoFar);
733  return; // not a fatal I/O error, unless the caller treats it as such
734 }
735 
736 static void
738 {
739  diskerWriteAttempts(ipcIo); // may fail
740  Ipc::Mem::PutPage(ipcIo.page);
741 }
742 
743 void
745 {
746  debugs(47, 7, HERE << "resuming handling requests after " <<
747  static_cast<const char *>(source));
750 }
751 
752 bool
754 {
755  const int ioRate = queue->localRateLimit().load();
756  const double maxRate = ioRate/1e3; // req/ms
757 
758  // do we need to enforce configured I/O rate?
759  if (maxRate <= 0)
760  return false;
761 
762  // is there an I/O request we could potentially delay?
763  int processId;
764  IpcIoMsg ipcIo;
765  if (!queue->peek(processId, ipcIo)) {
766  // unlike pop(), peek() is not reliable and does not block reader
767  // so we must proceed with pop() even if it is likely to fail
768  return false;
769  }
770 
771  static timeval LastIo = current_time;
772 
773  const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
774  // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
775  const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
776 
777  const double credit = ioDuration; // what the last I/O should have cost us
778  const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
779  LastIo = current_time;
780 
781  Ipc::QueueReader::Balance &balance = queue->localBalance();
782  balance += static_cast<int64_t>(credit - debit);
783 
784  debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
785 
786  if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
787  // if the next request is (likely) write and we accumulated
788  // too much time for future slow I/Os, then shed accumulated
789  // time to keep just half of the excess
790  const int64_t toSpend = balance - maxImbalance/2;
791 
792  if (toSpend/1e3 > Timeout)
793  debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
794  "I/O requests for " << (toSpend/1e3) << " seconds " <<
795  "to obey " << ioRate << "/sec rate limit");
796 
797  debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
798  (1e3*maxRate) << "/sec rate");
799  eventAdd("IpcIoFile::DiskerHandleMoreRequests",
801  const_cast<char*>("rate limiting"),
802  toSpend/1e3, 0, false);
804  return true;
805  } else if (balance < -maxImbalance) {
806  // do not owe "too much" to avoid "too large" bursts of I/O
807  balance = -maxImbalance;
808  }
809 
810  return false;
811 }
812 
813 void
815 {
816  // Balance our desire to maximize the number of concurrent I/O requests
817  // (reordred by OS to minimize seek time) with a requirement to
818  // send 1st-I/O notification messages, process Coordinator events, etc.
819  const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
820  const timeval loopStart = current_time;
821 
822  int popped = 0;
823  int workerId = 0;
824  IpcIoMsg ipcIo;
825  while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
826  ++popped;
827 
828  // at least one I/O per call is guaranteed if the queue is not empty
829  DiskerHandleRequest(workerId, ipcIo);
830 
831  getCurrentTime();
832  const double elapsedMsec = tvSubMsec(loopStart, current_time);
833  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
835  // the gap must be positive for select(2) to be given a chance
836  const double minBreakSecs = 0.001;
837  eventAdd("IpcIoFile::DiskerHandleMoreRequests",
839  const_cast<char*>("long I/O loop"),
840  minBreakSecs, 0, false);
842  }
843  debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
844  elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
845  break;
846  }
847  }
848 
849  // TODO: consider using O_DIRECT with "elevator" optimization where we pop
850  // requests first, then reorder the popped requests to optimize seek time,
851  // then do I/O, then take a break, and come back for the next set of I/O
852  // requests.
853 }
854 
856 void
857 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
858 {
859  if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
860  debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
861  " should not receive " << ipcIo.command <<
862  " ipcIo" << workerId << '.' << ipcIo.requestId);
863  return;
864  }
865 
866  debugs(47,5, HERE << "disker" << KidIdentifier <<
867  (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
868  ipcIo.len << " at " << ipcIo.offset <<
869  " ipcIo" << workerId << '.' << ipcIo.requestId);
870 
871  if (ipcIo.command == IpcIo::cmdRead)
872  diskerRead(ipcIo);
873  else // ipcIo.command == IpcIo::cmdWrite
874  diskerWrite(ipcIo);
875 
876  debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
877 
878  try {
879  if (queue->push(workerId, ipcIo))
880  Notify(workerId); // must notify worker
881  } catch (const Queue::Full &) {
882  // The worker queue should not overflow because the worker should pop()
883  // before push()ing and because if disker pops N requests at a time,
884  // we should make sure the worker pop() queue length is the worker
885  // push queue length plus N+1. XXX: implement the N+1 difference.
886  debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
887  DbName << " overflow: " <<
888  SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
889 
890  // the I/O request we could not push will timeout
891  }
892 }
893 
894 static bool
895 DiskerOpen(const SBuf &path, int flags, mode_t)
896 {
897  assert(TheFile < 0);
898 
899  DbName = path;
900  TheFile = file_open(DbName.c_str(), flags);
901 
902  if (TheFile < 0) {
903  const int xerrno = errno;
904  debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
905  xstrerr(xerrno));
906  return false;
907  }
908 
910  debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
911  return true;
912 }
913 
914 static void
915 DiskerClose(const SBuf &path)
916 {
917  if (TheFile >= 0) {
919  debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
920  TheFile = -1;
922  }
923  DbName.clear();
924 }
925 
929 {
930 public:
931  /* RegisteredRunner API */
933  virtual ~IpcIoRr();
934  virtual void claimMemoryNeeds();
935 
936 protected:
937  /* Ipc::Mem::RegisteredRunner API */
938  virtual void create();
939 
940 private:
942 };
943 
945 
946 void
948 {
949  const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
951  // the maximum number of shared I/O pages is approximately the
952  // number of queue slots, we add a fudge factor to that to account
953  // for corner cases where I/O pages are created before queue
954  // limits are checked or destroyed long after the I/O is dequeued
956  static_cast<int>(itemsCount * 1.1));
957 }
958 
959 void
961 {
962  if (Config.cacheSwap.n_strands <= 0)
963  return;
964 
965  Must(!owner);
968  1 + Config.workers, sizeof(IpcIoMsg),
969  QueueCapacity);
970 }
971 
973 {
974  delete owner;
975 }
976 
virtual bool canWrite() const
Definition: IpcIoFile.cc:199
virtual void create()
called when the runner should create a new memory segment
Definition: IpcIoFile.cc:960
strand registration with Coordinator (also used as an ACK)
Definition: StrandCoord.h:36
int diskId
the process ID of the disker we talk to
Definition: IpcIoFile.h:118
generally useful configuration options supported by some children
Definition: DiskFile.h:27
bool canWait() const
whether we think there is enough time to complete the I/O
Definition: IpcIoFile.cc:389
RequestMap * olderRequests
older requests (map1 or map2)
Definition: IpcIoFile.h:129
virtual void configure(const Config &)
notes supported configuration options; kids must call this first
Definition: DiskFile.h:42
StatCounters statCounter
Definition: StatCounters.cc:12
#define assert(EX)
Definition: assert.h:17
FREE * free_func
Definition: WriteRequest.h:28
static void HandleOpenResponse(const Ipc::StrandSearchResponse &response)
handle open response from coordinator
Definition: IpcIoFile.cc:424
void const char HLPCB * callback
Definition: stub_helper.cc:16
virtual void open(int flags, mode_t mode, RefCount< IORequestor > callback)
Definition: IpcIoFile.cc:99
StrandCoord strand
registrant coordinates and related details
Definition: StrandCoord.h:44
keeps original I/O request parameters while disker is handling the request
Definition: IpcIoFile.h:150
int requestorId
sender-provided return address
Definition: StrandSearch.h:28
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile)
Definition: IpcIoFile.cc:627
bool error_
whether we have seen at least one I/O error (XXX)
Definition: IpcIoFile.h:121
String tag
set when looking for a matching StrandCoord::tag
Definition: StrandSearch.h:29
std::list< Pointer > IpcIoFileList
Definition: IpcIoFile.h:135
static void diskerRead(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:651
virtual void create(int flags, mode_t mode, RefCount< IORequestor > callback)
Definition: IpcIoFile.cc:173
IpcIo::Command command
what disker is supposed to do or did
Definition: IpcIoFile.h:50
Definition: SBuf.h:86
Strand location details.
Definition: StrandCoord.h:19
virtual ~IpcIoRr()
Definition: IpcIoFile.cc:972
RefCount< IORequestor > ioRequestor
Definition: IpcIoFile.h:119
char * PagePointer(const PageId &page)
converts page handler into a temporary writeable shared memory pointer
Definition: Pages.cc:48
void putInt(int n)
store an integer
Definition: TypedMsgHdr.cc:109
size_t len
Definition: ReadRequest.h:26
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
int i
Definition: membanger.c:49
static void CheckTimeouts(void *const param)
IpcIoFile::checkTimeouts wrapper.
Definition: IpcIoFile.cc:517
static void Notify(const int peerId)
Definition: IpcIoFile.cc:474
std::ostream & operator<<(std::ostream &os, const SipcIo &sio)
Definition: IpcIoFile.cc:66
virtual void write(WriteRequest *)
Definition: IpcIoFile.cc:259
static void HandleResponses(const char *const when)
Definition: IpcIoFile.cc:442
StrandCoord strand
answer matching StrandSearchRequest criteria
Definition: StrandSearch.h:41
struct StatCounters::@137::@140 disk
void scheduleTimeoutCheck()
prepare to check for timeouts in a little while
Definition: IpcIoFile.cc:572
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
called when disker receives an I/O request
Definition: IpcIoFile.cc:857
int kidId
internal Squid process number
Definition: StrandCoord.h:29
int file_open(const char *path, int mode)
Definition: fs_io.cc:46
IpcIoFile(char const *aDb)
Definition: IpcIoFile.cc:72
void clear()
Definition: SBuf.cc:178
virtual int getFD() const
Definition: IpcIoFile.cc:606
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
virtual void configure(const Config &cfg)
notes supported configuration options; kids must call this first
Definition: IpcIoFile.cc:92
void NotePageNeed(const int purpose, const int count)
claim the need for a number of pages for a given purpose
Definition: Pages.cc:72
std::map< unsigned int, IpcIoPendingRequest * > RequestMap
maps requestId to the handleResponse callback
Definition: IpcIoFile.h:126
char * buf
Definition: ReadRequest.h:24
Definition: enums.h:24
bool GetPage(const PageId::Purpose purpose, PageId &page)
sets page ID and returns true unless no free pages are found
Definition: Pages.cc:34
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response)
Definition: IpcIoFile.cc:278
#define DBG_CRITICAL
Definition: Debug.h:45
bool timeoutCheckScheduled
we expect a CheckTimeouts() call
Definition: IpcIoFile.h:131
virtual bool canRead() const
Definition: IpcIoFile.cc:193
static IpcIoFilesMap IpcIoFiles
Definition: IpcIoFile.h:140
struct timeval current_time
Definition: stub_time.cc:15
DiskFile::Config config
supported configuration options
Definition: IpcIoFile.h:87
A const & max(A const &lhs, A const &rhs)
Definition: enums.h:23
asynchronous strand search request
Definition: StrandSearch.h:20
int ioRate
shape I/O request stream to approach that many per second
Definition: DiskFile.h:36
bool IamWorkerProcess()
whether the current process handles HTTP transactions and such
Definition: stub_tools.cc:49
#define DISK_ERROR
Definition: defines.h:40
virtual const char * what() const override
virtual void claimMemoryNeeds()
Definition: IpcIoFile.cc:947
RunnerRegistrationEntry(IpcIoRr)
void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response)
Definition: IpcIoFile.cc:229
#define SWALLOW_EXCEPTIONS(code)
Definition: TextException.h:73
static bool DiskerHandleMoreRequestsScheduled
whether we are waiting for an event to handle still queued I/O requests
Definition: IpcIoFile.h:146
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandCoord.cc:51
void fd_bytes(int fd, int len, unsigned int type)
Definition: fd.cc:261
WriteRequest * writeRequest
set if this is a write request
Definition: IpcIoFile.h:161
unsigned int lastRequestId
last requestId used
Definition: IpcIoFile.h:123
std::map< int, IpcIoFile * > IpcIoFilesMap
Definition: IpcIoFile.h:139
int disker
Definition: IpcIoFile.cc:62
virtual bool ioInProgress() const
Definition: IpcIoFile.cc:313
int tvSubMsec(struct timeval, struct timeval)
Definition: stub_time.cc:20
const char * xstrerr(int error)
Definition: xstrerror.cc:83
static void DiskerHandleRequests()
Definition: IpcIoFile.cc:814
void checkTimeouts()
Definition: IpcIoFile.cc:528
off_t offset
Definition: ReadRequest.h:25
int n_strands
number of disk processes required to support all cache_dirs
Definition: SquidConfig.h:62
static const char *const ShmLabel
shared memory segment path to use for IpcIoFile maps
Definition: IpcIoFile.cc:38
static SBuf DbName
full db file name
Definition: IpcIoFile.cc:647
static std::unique_ptr< Queue > queue
IPC queue.
Definition: IpcIoFile.h:143
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:124
#define DBG_IMPORTANT
Definition: Debug.h:46
friend class IpcIoPendingRequest
Definition: IpcIoFile.h:90
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:51
RequestMap requestMap1
older (or newer) pending requests
Definition: IpcIoFile.h:127
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:202
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
Definition: IpcIoFile.cc:486
virtual void closeCompleted()=0
static void DiskerClose(const SBuf &path)
Definition: IpcIoFile.cc:915
char const * termedBuf() const
Definition: SquidString.h:91
void * addr
Definition: membanger.c:46
int xerrno
I/O error code or zero.
Definition: IpcIoFile.h:53
char const * buf
Definition: WriteRequest.h:25
void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
track a new pending request
Definition: IpcIoFile.cc:320
const char * c_str()
Definition: SBuf.cc:526
virtual void read(ReadRequest *)
Definition: IpcIoFile.cc:211
int getInt() const
load an integer
Definition: TypedMsgHdr.cc:101
static void diskerWrite(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:737
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandSearch.cc:28
asynchronous strand search response
Definition: StrandSearch.h:33
static bool WaitBeforePop()
Definition: IpcIoFile.cc:753
void const char * buf
Definition: stub_helper.cc:16
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:153
static void DiskerHandleMoreRequests(void *)
Definition: IpcIoFile.cc:744
ReadRequest * readRequest
set if this is a read requests
Definition: IpcIoFile.h:160
static const int QueueCapacity
Definition: IpcIoFile.cc:43
SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker)
Definition: IpcIoFile.cc:57
an std::runtime_error with thrower location info
Definition: TextException.h:19
unsigned short mode_t
Definition: types.h:150
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:22
Ipc::Mem::PageId page
Definition: IpcIoFile.h:48
const IpcIoFile::Pointer file
the file object waiting for the response
Definition: IpcIoFile.h:159
size_t PageSize()
returns page size in bytes; all pages are assumed to be the same size
Definition: Pages.cc:28
time_t getCurrentTime(void)
Get current time.
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:109
virtual bool error() const
Definition: IpcIoFile.cc:205
int KidIdentifier
virtual ~IpcIoFile()
Definition: IpcIoFile.cc:79
void completeIo(IpcIoMsg *const response)
called when response is received and, with a nil response, on timeouts
Definition: IpcIoFile.cc:633
virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ReadRequest >)=0
RequestMap requestMap2
newer (or older) pending requests
Definition: IpcIoFile.h:128
Ipc::FewToFewBiQueue::Owner * owner
Definition: IpcIoFile.cc:941
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:31
Store::DiskConfig cacheSwap
Definition: SquidConfig.h:419
IpcIo wrapper for debugs() streams; XXX: find a better class name.
Definition: IpcIoFile.cc:56
int store_open_disk_fd
RequestMap * newerRequests
newer requests (map2 or map1)
Definition: IpcIoFile.h:130
void push(IpcIoPendingRequest *const pending)
push an I/O request to disker
Definition: IpcIoFile.cc:331
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:221
size_t len
Definition: IpcIoFile.h:47
#define TexcHere(msg)
legacy convenience macro; it is not difficult to type Here() now
Definition: TextException.h:55
void handleResponse(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:456
void PutPage(PageId &page)
makes identified page available as a free page to future GetPage() callers
Definition: Pages.cc:41
time_msec_t ioTimeout
canRead/Write should return false if expected I/O delay exceeds it
Definition: DiskFile.h:33
int worker
Definition: IpcIoFile.cc:60
static int TheFile
db file descriptor
Definition: IpcIoFile.cc:648
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:32
static const double Timeout
timeout value in seconds
Definition: IpcIoFile.h:133
const String dbName
the name of the file we are managing
Definition: IpcIoFile.h:117
void setType(int aType)
sets message type; use MessageType enum
Definition: TypedMsgHdr.cc:90
unsigned int requestId
unique for requestor; matches request w/ response
Definition: IpcIoFile.h:44
bool IamDiskProcess() STUB_RETVAL_NOP(false) bool InDaemonMode() STUB_RETVAL_NOP(false) bool UsingSmp() STUB_RETVAL_NOP(false) bool IamCoordinatorProcess() STUB_RETVAL(false) bool IamPrimaryProcess() STUB_RETVAL(false) int NumberOfKids() STUB_RETVAL(0) void setMaxFD(void) STUB void setSystemLimits(void) STUB void squid_signal(int
whether the current process is dedicated to managing a cache_dir
#define DISK_OK
Definition: defines.h:39
void openCompleted(const Ipc::StrandSearchResponse *const response)
Definition: IpcIoFile.cc:144
static IpcIoFileList WaitingForOpen
pending open requests
Definition: IpcIoFile.h:136
const IpcIoMsg & msg
Definition: IpcIoFile.cc:61
off_t offset
Definition: IpcIoFile.h:46
AtomicSignedMsec Balance
Definition: Queue.h:58
virtual void ioCompletedNotification()=0
void file_close(int fd)
Definition: fs_io.cc:76
IpcIoPendingRequest * dequeueRequest(const unsigned int requestId)
returns and forgets the right IpcIoFile pending request
Definition: IpcIoFile.cc:582
CBDATA_CLASS_INIT(IpcIoFile)
virtual void writeCompleted(int errflag, size_t len, RefCount< WriteRequest >)=0
static void OpenTimeout(void *const param)
handles open request timeout
Definition: IpcIoFile.cc:499
class SquidConfig Config
Definition: SquidConfig.cc:12
static String CoordinatorAddr()
get the IPC message address for coordinator process
Definition: Port.cc:64
#define NULL
Definition: types.h:166
A const & min(A const &lhs, A const &rhs)
virtual void close()
Definition: IpcIoFile.cc:181
struct StatCounters::@137 syscalls
#define false
Definition: GnuRegex.c:233
converts DiskIO requests to IPC queue messages
Definition: IpcIoFile.h:38
struct timeval start
when the I/O request was converted to IpcIoMsg
Definition: IpcIoFile.h:51
static bool DiskerOpen(const SBuf &path, int flags, mode_t mode)
Definition: IpcIoFile.cc:895
static void diskerWriteAttempts(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:681

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors