IpcIoFile.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 47 Store Directory Routines */
10 
11 #include "squid.h"
12 #include "base/RunnersRegistry.h"
13 #include "base/TextException.h"
14 #include "DiskIO/IORequestor.h"
15 #include "DiskIO/IpcIo/IpcIoFile.h"
16 #include "DiskIO/ReadRequest.h"
17 #include "DiskIO/WriteRequest.h"
18 #include "fd.h"
19 #include "fs_io.h"
20 #include "globals.h"
21 #include "ipc/mem/Pages.h"
22 #include "ipc/Messages.h"
23 #include "ipc/Port.h"
24 #include "ipc/Queue.h"
25 #include "ipc/StrandSearch.h"
26 #include "ipc/UdsOp.h"
27 #include "sbuf/SBuf.h"
28 #include "SquidConfig.h"
29 #include "SquidTime.h"
30 #include "StatCounters.h"
31 #include "tools.h"
32 
33 #include <cerrno>
34 
36 
38 static const char *const ShmLabel = "io_file";
42 // TODO: make configurable or compute from squid.conf settings if possible
43 static const int QueueCapacity = 1024;
44 
45 const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
48 std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
49 
51 
52 static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
53 static void DiskerClose(const SBuf &path);
54 
56 struct SipcIo {
57  SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
58  worker(aWorker), msg(aMsg), disker(aDisker) {}
59 
60  int worker;
61  const IpcIoMsg &msg;
62  int disker;
63 };
64 
65 std::ostream &
66 operator <<(std::ostream &os, const SipcIo &sio)
67 {
68  return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
69  (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
70 }
71 
72 IpcIoFile::IpcIoFile(char const *aDb):
73  dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
74  olderRequests(&requestMap1), newerRequests(&requestMap2),
75  timeoutCheckScheduled(false)
76 {
77 }
78 
80 {
81  if (diskId >= 0) {
82  const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
83  // XXX: warn and continue?
84  Must(i != IpcIoFiles.end());
85  Must(i->second == this);
86  IpcIoFiles.erase(i);
87  }
88 }
89 
90 void
92 {
94  config = cfg;
95 }
96 
97 void
99 {
101  Must(diskId < 0); // we do not know our disker yet
102 
103  if (!queue.get())
105 
106  if (IamDiskProcess()) {
107  error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
108  if (error_)
109  return;
110 
112  const bool inserted =
113  IpcIoFiles.insert(std::make_pair(diskId, this)).second;
114  Must(inserted);
115 
116  queue->localRateLimit().store(config.ioRate);
117 
119  ann.strand.tag = dbName;
120  Ipc::TypedMsgHdr message;
121  ann.pack(message);
123 
125  return;
126  }
127 
129  request.requestorId = KidIdentifier;
130  request.tag = dbName;
131 
132  Ipc::TypedMsgHdr msg;
133  request.pack(msg);
135 
136  WaitingForOpen.push_back(this);
137 
138  eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
139  this, Timeout, 0, false); // "this" pointer is used as id
140 }
141 
142 void
144 {
145  Must(diskId < 0); // we do not know our disker yet
146 
147  if (!response) {
148  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
149  "channel establishment timeout");
150  error_ = true;
151  } else {
152  diskId = response->strand.kidId;
153  if (diskId >= 0) {
154  const bool inserted =
155  IpcIoFiles.insert(std::make_pair(diskId, this)).second;
156  Must(inserted);
157  } else {
158  error_ = true;
159  debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
160  "responsibility for " << dbName);
161  }
162  }
163 
165 }
166 
171 void
173 {
174  assert(false); // check
175  /* We use the same logic path for open */
176  open(flags, mode, callback);
177 }
178 
179 void
181 {
182  assert(ioRequestor != NULL);
183 
184  if (IamDiskProcess())
186  // XXX: else nothing to do?
187 
189 }
190 
191 bool
193 {
194  return diskId >= 0 && !error_ && canWait();
195 }
196 
197 bool
199 {
200  return diskId >= 0 && !error_ && canWait();
201 }
202 
203 bool
205 {
206  return error_;
207 }
208 
209 void
211 {
212  debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
213  readRequest->offset << ")");
214 
215  assert(ioRequestor != NULL);
216  assert(readRequest->offset >= 0);
217  Must(!error_);
218 
219  //assert(minOffset < 0 || minOffset <= readRequest->offset);
220  //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
221 
222  IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
223  pending->readRequest = readRequest;
224  push(pending);
225 }
226 
227 void
229  IpcIoMsg *const response)
230 {
231  bool ioError = false;
232  if (!response) {
233  debugs(79, 3, HERE << "error: timeout");
234  ioError = true; // I/O timeout does not warrant setting error_?
235  } else {
236  if (response->xerrno) {
237  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
238  xstrerr(response->xerrno));
239  ioError = error_ = true;
240  } else if (!response->page) {
241  debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
242  "out of shared memory pages");
243  ioError = true;
244  } else {
245  const char *const buf = Ipc::Mem::PagePointer(response->page);
246  memcpy(readRequest->buf, buf, response->len);
247  }
248 
249  Ipc::Mem::PutPage(response->page);
250  }
251 
252  const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
253  const int errflag = ioError ? DISK_ERROR :DISK_OK;
254  ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
255 }
256 
257 void
259 {
260  debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
261  writeRequest->offset << ")");
262 
263  assert(ioRequestor != NULL);
264  assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
265  assert(writeRequest->offset >= 0);
266  Must(!error_);
267 
268  //assert(minOffset < 0 || minOffset <= writeRequest->offset);
269  //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
270 
271  IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
272  pending->writeRequest = writeRequest;
273  push(pending);
274 }
275 
276 void
278  const IpcIoMsg *const response)
279 {
280  bool ioError = false;
281  if (!response) {
282  debugs(79, 3, "disker " << diskId << " timeout");
283  ioError = true; // I/O timeout does not warrant setting error_?
284  } else if (response->xerrno) {
285  debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
286  " error writing " << writeRequest->len << " bytes at " <<
287  writeRequest->offset << ": " << xstrerr(response->xerrno) <<
288  "; this worker will stop using " << dbName);
289  ioError = error_ = true;
290  } else if (response->len != writeRequest->len) {
291  debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
292  response->len << " instead of " << writeRequest->len <<
293  " bytes (offset " << writeRequest->offset << "); " <<
294  "this worker will stop using " << dbName);
295  error_ = true;
296  }
297 
298  if (writeRequest->free_func)
299  (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
300 
301  if (!ioError) {
302  debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
303  diskId << " at " << writeRequest->offset);
304  }
305 
306  const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
307  const int errflag = ioError ? DISK_ERROR :DISK_OK;
308  ioRequestor->writeCompleted(errflag, rlen, writeRequest);
309 }
310 
311 bool
313 {
314  return !olderRequests->empty() || !newerRequests->empty();
315 }
316 
318 void
319 IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
320 {
321  const std::pair<RequestMap::iterator,bool> result =
322  newerRequests->insert(std::make_pair(id, pending));
323  Must(result.second); // failures means that id was not unique
326 }
327 
329 void
331 {
332  // prevent queue overflows: check for responses to earlier requests
333  // warning: this call may result in indirect push() recursion
334  HandleResponses("before push");
335 
336  debugs(47, 7, HERE);
337  Must(diskId >= 0);
338  Must(pending);
339  Must(pending->readRequest || pending->writeRequest);
340 
341  IpcIoMsg ipcIo;
342  try {
343  if (++lastRequestId == 0) // don't use zero value as requestId
344  ++lastRequestId;
345  ipcIo.requestId = lastRequestId;
346  ipcIo.start = current_time;
347  if (pending->readRequest) {
348  ipcIo.command = IpcIo::cmdRead;
349  ipcIo.offset = pending->readRequest->offset;
350  ipcIo.len = pending->readRequest->len;
351  } else { // pending->writeRequest
352  Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
354  ipcIo.len = 0;
355  throw TexcHere("run out of shared memory pages for IPC I/O");
356  }
357  ipcIo.command = IpcIo::cmdWrite;
358  ipcIo.offset = pending->writeRequest->offset;
359  ipcIo.len = pending->writeRequest->len;
360  char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
361  memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
362  }
363 
364  debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
365 
366  if (queue->push(diskId, ipcIo))
367  Notify(diskId); // must notify disker
368  trackPendingRequest(ipcIo.requestId, pending);
369  } catch (const Queue::Full &) {
370  debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
371  dbName << " overflow: " <<
372  SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
373  // TODO: grow queue size
374 
375  pending->completeIo(NULL);
376  delete pending;
377  } catch (const TextException &e) {
378  debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
379  pending->completeIo(NULL);
380  delete pending;
381  }
382 }
383 
385 bool
387 {
388  if (!config.ioTimeout)
389  return true; // no timeout specified
390 
391  IpcIoMsg oldestIo;
392  if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
393  return true; // we cannot estimate expected wait time; assume it is OK
394 
395  const int oldestWait = tvSubMsec(oldestIo.start, current_time);
396 
397  int rateWait = -1; // time in millisecons
398  const int ioRate = queue->rateLimit(diskId).load();
399  if (ioRate > 0) {
400  // if there are N requests pending, the new one will wait at
401  // least N/max-swap-rate seconds
402  rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
403  // adjust N/max-swap-rate value based on the queue "balance"
404  // member, in case we have been borrowing time against future
405  // I/O already
406  rateWait += queue->balance(diskId);
407  }
408 
409  const int expectedWait = max(oldestWait, rateWait);
410  if (expectedWait < 0 ||
411  static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
412  return true; // expected wait time is acceptible
413 
414  debugs(47,2, HERE << "cannot wait: " << expectedWait <<
415  " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
416  return false; // do not want to wait that long
417 }
418 
420 void
422 {
423  debugs(47, 7, HERE << "coordinator response to open request");
424  for (IpcIoFileList::iterator i = WaitingForOpen.begin();
425  i != WaitingForOpen.end(); ++i) {
426  if (response.strand.tag == (*i)->dbName) {
427  (*i)->openCompleted(&response);
428  WaitingForOpen.erase(i);
429  return;
430  }
431  }
432 
433  debugs(47, 4, HERE << "LATE disker response to open for " <<
434  response.strand.tag);
435  // nothing we can do about it; completeIo() has been called already
436 }
437 
438 void
439 IpcIoFile::HandleResponses(const char *const when)
440 {
441  debugs(47, 4, HERE << "popping all " << when);
442  IpcIoMsg ipcIo;
443  // get all responses we can: since we are not pushing, this will stop
444  int diskId;
445  while (queue->pop(diskId, ipcIo)) {
446  const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
447  Must(i != IpcIoFiles.end()); // TODO: warn but continue
448  i->second->handleResponse(ipcIo);
449  }
450 }
451 
452 void
454 {
455  const int requestId = ipcIo.requestId;
456  debugs(47, 7, HERE << "popped disker response: " <<
457  SipcIo(KidIdentifier, ipcIo, diskId));
458 
459  Must(requestId);
460  if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
461  pending->completeIo(&ipcIo);
462  delete pending; // XXX: leaking if throwing
463  } else {
464  debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
465  "; ipcIo" << KidIdentifier << '.' << requestId);
466  // nothing we can do about it; completeIo() has been called already
467  }
468 }
469 
470 void
471 IpcIoFile::Notify(const int peerId)
472 {
473  // TODO: Count and report the total number of notifications, pops, pushes.
474  debugs(47, 7, HERE << "kid" << peerId);
475  Ipc::TypedMsgHdr msg;
476  msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
477  msg.putInt(KidIdentifier);
479  Ipc::SendMessage(addr, msg);
480 }
481 
482 void
484 {
485  const int from = msg.getInt();
486  debugs(47, 7, HERE << "from " << from);
487  queue->clearReaderSignal(from);
488  if (IamDiskProcess())
490  else
491  HandleResponses("after notification");
492 }
493 
495 void
496 IpcIoFile::OpenTimeout(void *const param)
497 {
498  Must(param);
499  // the pointer is used for comparison only and not dereferenced
500  const IpcIoFile *const ipcIoFile =
501  reinterpret_cast<const IpcIoFile *>(param);
502  for (IpcIoFileList::iterator i = WaitingForOpen.begin();
503  i != WaitingForOpen.end(); ++i) {
504  if (*i == ipcIoFile) {
505  (*i)->openCompleted(NULL);
506  WaitingForOpen.erase(i);
507  break;
508  }
509  }
510 }
511 
513 void
514 IpcIoFile::CheckTimeouts(void *const param)
515 {
516  Must(param);
517  const int diskId = reinterpret_cast<uintptr_t>(param);
518  debugs(47, 7, HERE << "diskId=" << diskId);
519  const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
520  if (i != IpcIoFiles.end())
521  i->second->checkTimeouts();
522 }
523 
524 void
526 {
527  timeoutCheckScheduled = false;
528 
529  // last chance to recover in case a notification message was lost, etc.
530  const RequestMap::size_type timeoutsBefore = olderRequests->size();
531  HandleResponses("before timeout");
532  const RequestMap::size_type timeoutsNow = olderRequests->size();
533 
534  if (timeoutsBefore > timeoutsNow) { // some requests were rescued
535  // notification message lost or significantly delayed?
536  debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
537  " may be too slow or disrupted for about " <<
538  Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
539  " out of " << timeoutsBefore << " I/Os");
540  }
541 
542  if (timeoutsNow) {
543  debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
544  timeoutsNow << ' ' << dbName << " I/Os after at least " <<
545  Timeout << "s timeout");
546  }
547 
548  // any old request would have timed out by now
549  typedef RequestMap::const_iterator RMCI;
550  for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
551  IpcIoPendingRequest *const pending = i->second;
552 
553  const unsigned int requestId = i->first;
554  debugs(47, 7, HERE << "disker timeout; ipcIo" <<
555  KidIdentifier << '.' << requestId);
556 
557  pending->completeIo(NULL); // no response
558  delete pending; // XXX: leaking if throwing
559  }
560  olderRequests->clear();
561 
562  swap(olderRequests, newerRequests); // switches pointers around
563  if (!olderRequests->empty() && !timeoutCheckScheduled)
565 }
566 
568 void
570 {
571  // we check all older requests at once so some may be wait for 2*Timeout
572  eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
573  reinterpret_cast<void *>(diskId), Timeout, 0, false);
574  timeoutCheckScheduled = true;
575 }
576 
579 IpcIoFile::dequeueRequest(const unsigned int requestId)
580 {
581  Must(requestId != 0);
582 
583  RequestMap *map = NULL;
584  RequestMap::iterator i = requestMap1.find(requestId);
585 
586  if (i != requestMap1.end())
587  map = &requestMap1;
588  else {
589  i = requestMap2.find(requestId);
590  if (i != requestMap2.end())
591  map = &requestMap2;
592  }
593 
594  if (!map) // not found in both maps
595  return NULL;
596 
597  IpcIoPendingRequest *pending = i->second;
598  map->erase(i);
599  return pending;
600 }
601 
602 int
604 {
605  assert(false); // not supported; TODO: remove this method from API
606  return -1;
607 }
608 
609 /* IpcIoMsg */
610 
612  requestId(0),
613  offset(0),
614  len(0),
615  command(IpcIo::cmdNone),
616  xerrno(0)
617 {
618  start.tv_sec = 0;
619  start.tv_usec = 0;
620 }
621 
622 /* IpcIoPendingRequest */
623 
625  file(aFile), readRequest(NULL), writeRequest(NULL)
626 {
627 }
628 
629 void
631 {
632  if (readRequest)
633  file->readCompleted(readRequest, response);
634  else if (writeRequest)
635  file->writeCompleted(writeRequest, response);
636  else {
637  Must(!response); // only timeouts are handled here
638  file->openCompleted(NULL);
639  }
640 }
641 
642 /* XXX: disker code that should probably be moved elsewhere */
643 
644 static SBuf DbName;
645 static int TheFile = -1;
646 
647 static void
649 {
651  ipcIo.len = 0;
652  debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
653  return;
654  }
655 
656  char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
657  const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
658  ++statCounter.syscalls.disk.reads;
659  fd_bytes(TheFile, read, FD_READ);
660 
661  if (read >= 0) {
662  ipcIo.xerrno = 0;
663  const size_t len = static_cast<size_t>(read); // safe because read > 0
664  debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
665  (len == ipcIo.len ? "all " : "just ") << read);
666  ipcIo.len = len;
667  } else {
668  ipcIo.xerrno = errno;
669  ipcIo.len = 0;
670  debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
671  ipcIo.xerrno);
672  }
673 }
674 
677 static void
679 {
680  const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
681  size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
682  size_t wroteSoFar = 0;
683  off_t offset = ipcIo.offset;
684  // Partial writes to disk do happen. It is unlikely that the caller can
685  // handle partial writes by doing something other than writing leftovers
686  // again, so we try to write them ourselves to minimize overheads.
687  const int attemptLimit = 10;
688  for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
689  const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
690  ++statCounter.syscalls.disk.writes;
691  fd_bytes(TheFile, result, FD_WRITE);
692 
693  if (result < 0) {
694  ipcIo.xerrno = errno;
695  assert(ipcIo.xerrno);
696  debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
697  " writing " << toWrite << '/' << ipcIo.len <<
698  " at " << ipcIo.offset << '+' << wroteSoFar <<
699  " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
700  ipcIo.len = wroteSoFar;
701  return; // bail on error
702  }
703 
704  const size_t wroteNow = static_cast<size_t>(result); // result >= 0
705  ipcIo.xerrno = 0;
706 
707  debugs(47,3, "disker" << KidIdentifier << " wrote " <<
708  (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
709  " out of " << toWrite << '/' << ipcIo.len << " at " <<
710  ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
711  " try");
712 
713  wroteSoFar += wroteNow;
714 
715  if (wroteNow >= toWrite) {
716  ipcIo.xerrno = 0;
717  ipcIo.len = wroteSoFar;
718  return; // wrote everything there was to write
719  }
720 
721  buf += wroteNow;
722  offset += wroteNow;
723  toWrite -= wroteNow;
724  }
725 
726  debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
727  attemptLimit << " attempts while writing " <<
728  toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
729  wroteSoFar);
730  return; // not a fatal I/O error, unless the caller treats it as such
731 }
732 
733 static void
735 {
736  diskerWriteAttempts(ipcIo); // may fail
737  Ipc::Mem::PutPage(ipcIo.page);
738 }
739 
740 void
742 {
743  debugs(47, 7, HERE << "resuming handling requests after " <<
744  static_cast<const char *>(source));
747 }
748 
749 bool
751 {
752  const int ioRate = queue->localRateLimit().load();
753  const double maxRate = ioRate/1e3; // req/ms
754 
755  // do we need to enforce configured I/O rate?
756  if (maxRate <= 0)
757  return false;
758 
759  // is there an I/O request we could potentially delay?
760  int processId;
761  IpcIoMsg ipcIo;
762  if (!queue->peek(processId, ipcIo)) {
763  // unlike pop(), peek() is not reliable and does not block reader
764  // so we must proceed with pop() even if it is likely to fail
765  return false;
766  }
767 
768  static timeval LastIo = current_time;
769 
770  const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
771  // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
772  const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
773 
774  const double credit = ioDuration; // what the last I/O should have cost us
775  const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
776  LastIo = current_time;
777 
778  Ipc::QueueReader::Balance &balance = queue->localBalance();
779  balance += static_cast<int64_t>(credit - debit);
780 
781  debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
782 
783  if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
784  // if the next request is (likely) write and we accumulated
785  // too much time for future slow I/Os, then shed accumulated
786  // time to keep just half of the excess
787  const int64_t toSpend = balance - maxImbalance/2;
788 
789  if (toSpend/1e3 > Timeout)
790  debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
791  "I/O requests for " << (toSpend/1e3) << " seconds " <<
792  "to obey " << ioRate << "/sec rate limit");
793 
794  debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
795  (1e3*maxRate) << "/sec rate");
796  eventAdd("IpcIoFile::DiskerHandleMoreRequests",
798  const_cast<char*>("rate limiting"),
799  toSpend/1e3, 0, false);
801  return true;
802  } else if (balance < -maxImbalance) {
803  // do not owe "too much" to avoid "too large" bursts of I/O
804  balance = -maxImbalance;
805  }
806 
807  return false;
808 }
809 
810 void
812 {
813  // Balance our desire to maximize the number of concurrent I/O requests
814  // (reordred by OS to minimize seek time) with a requirement to
815  // send 1st-I/O notification messages, process Coordinator events, etc.
816  const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
817  const timeval loopStart = current_time;
818 
819  int popped = 0;
820  int workerId = 0;
821  IpcIoMsg ipcIo;
822  while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
823  ++popped;
824 
825  // at least one I/O per call is guaranteed if the queue is not empty
826  DiskerHandleRequest(workerId, ipcIo);
827 
828  getCurrentTime();
829  const double elapsedMsec = tvSubMsec(loopStart, current_time);
830  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
832  // the gap must be positive for select(2) to be given a chance
833  const double minBreakSecs = 0.001;
834  eventAdd("IpcIoFile::DiskerHandleMoreRequests",
836  const_cast<char*>("long I/O loop"),
837  minBreakSecs, 0, false);
839  }
840  debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
841  elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
842  break;
843  }
844  }
845 
846  // TODO: consider using O_DIRECT with "elevator" optimization where we pop
847  // requests first, then reorder the popped requests to optimize seek time,
848  // then do I/O, then take a break, and come back for the next set of I/O
849  // requests.
850 }
851 
853 void
854 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
855 {
856  if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
857  debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
858  " should not receive " << ipcIo.command <<
859  " ipcIo" << workerId << '.' << ipcIo.requestId);
860  return;
861  }
862 
863  debugs(47,5, HERE << "disker" << KidIdentifier <<
864  (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
865  ipcIo.len << " at " << ipcIo.offset <<
866  " ipcIo" << workerId << '.' << ipcIo.requestId);
867 
868  if (ipcIo.command == IpcIo::cmdRead)
869  diskerRead(ipcIo);
870  else // ipcIo.command == IpcIo::cmdWrite
871  diskerWrite(ipcIo);
872 
873  debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
874 
875  try {
876  if (queue->push(workerId, ipcIo))
877  Notify(workerId); // must notify worker
878  } catch (const Queue::Full &) {
879  // The worker queue should not overflow because the worker should pop()
880  // before push()ing and because if disker pops N requests at a time,
881  // we should make sure the worker pop() queue length is the worker
882  // push queue length plus N+1. XXX: implement the N+1 difference.
883  debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
884  DbName << " overflow: " <<
885  SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
886 
887  // the I/O request we could not push will timeout
888  }
889 }
890 
891 static bool
892 DiskerOpen(const SBuf &path, int flags, mode_t)
893 {
894  assert(TheFile < 0);
895 
896  DbName = path;
897  TheFile = file_open(DbName.c_str(), flags);
898 
899  if (TheFile < 0) {
900  const int xerrno = errno;
901  debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
902  xstrerr(xerrno));
903  return false;
904  }
905 
907  debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
908  return true;
909 }
910 
911 static void
912 DiskerClose(const SBuf &path)
913 {
914  if (TheFile >= 0) {
916  debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
917  TheFile = -1;
919  }
920  DbName.clear();
921 }
922 
926 {
927 public:
928  /* RegisteredRunner API */
930  virtual ~IpcIoRr();
931  virtual void claimMemoryNeeds();
932 
933 protected:
934  /* Ipc::Mem::RegisteredRunner API */
935  virtual void create();
936 
937 private:
939 };
940 
942 
943 void
945 {
946  const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
948  // the maximum number of shared I/O pages is approximately the
949  // number of queue slots, we add a fudge factor to that to account
950  // for corner cases where I/O pages are created before queue
951  // limits are checked or destroyed long after the I/O is dequeued
953  static_cast<int>(itemsCount * 1.1));
954 }
955 
956 void
958 {
959  if (Config.cacheSwap.n_strands <= 0)
960  return;
961 
962  Must(!owner);
965  1 + Config.workers, sizeof(IpcIoMsg),
966  QueueCapacity);
967 }
968 
970 {
971  delete owner;
972 }
973 
virtual bool canWrite() const
Definition: IpcIoFile.cc:198
virtual void create()
called when the runner should create a new memory segment
Definition: IpcIoFile.cc:957
strand registration with Coordinator (also used as an ACK)
Definition: StrandCoord.h:36
int diskId
the process ID of the disker we talk to
Definition: IpcIoFile.h:118
generally useful configuration options supported by some children
Definition: DiskFile.h:27
bool canWait() const
whether we think there is enough time to complete the I/O
Definition: IpcIoFile.cc:386
RequestMap * olderRequests
older requests (map1 or map2)
Definition: IpcIoFile.h:129
virtual void configure(const Config &)
notes supported configuration options; kids must call this first
Definition: DiskFile.h:42
StatCounters statCounter
Definition: StatCounters.cc:12
#define assert(EX)
Definition: assert.h:17
FREE * free_func
Definition: WriteRequest.h:28
static void HandleOpenResponse(const Ipc::StrandSearchResponse &response)
handle open response from coordinator
Definition: IpcIoFile.cc:421
void const char HLPCB * callback
Definition: stub_helper.cc:16
virtual void open(int flags, mode_t mode, RefCount< IORequestor > callback)
Definition: IpcIoFile.cc:98
StrandCoord strand
registrant coordinates and related details
Definition: StrandCoord.h:44
keeps original I/O request parameters while disker is handling the request
Definition: IpcIoFile.h:150
int requestorId
sender-provided return address
Definition: StrandSearch.h:28
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile)
Definition: IpcIoFile.cc:624
bool error_
whether we have seen at least one I/O error (XXX)
Definition: IpcIoFile.h:121
String tag
set when looking for a matching StrandCoord::tag
Definition: StrandSearch.h:29
std::list< Pointer > IpcIoFileList
Definition: IpcIoFile.h:135
static void diskerRead(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:648
virtual void create(int flags, mode_t mode, RefCount< IORequestor > callback)
Definition: IpcIoFile.cc:172
IpcIo::Command command
what disker is supposed to do or did
Definition: IpcIoFile.h:50
Definition: SBuf.h:87
Strand location details.
Definition: StrandCoord.h:19
virtual ~IpcIoRr()
Definition: IpcIoFile.cc:969
RefCount< IORequestor > ioRequestor
Definition: IpcIoFile.h:119
char * PagePointer(const PageId &page)
converts page handler into a temporary writeable shared memory pointer
Definition: Pages.cc:48
void putInt(int n)
store an integer
Definition: TypedMsgHdr.cc:109
size_t len
Definition: ReadRequest.h:26
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
int i
Definition: membanger.c:49
static void CheckTimeouts(void *const param)
IpcIoFile::checkTimeouts wrapper.
Definition: IpcIoFile.cc:514
static void Notify(const int peerId)
Definition: IpcIoFile.cc:471
std::ostream & operator<<(std::ostream &os, const SipcIo &sio)
Definition: IpcIoFile.cc:66
virtual void write(WriteRequest *)
Definition: IpcIoFile.cc:258
static void HandleResponses(const char *const when)
Definition: IpcIoFile.cc:439
StrandCoord strand
answer matching StrandSearchRequest criteria
Definition: StrandSearch.h:41
void scheduleTimeoutCheck()
prepare to check for timeouts in a little while
Definition: IpcIoFile.cc:569
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
called when disker receives an I/O request
Definition: IpcIoFile.cc:854
int kidId
internal Squid process number
Definition: StrandCoord.h:29
int file_open(const char *path, int mode)
Definition: fs_io.cc:46
IpcIoFile(char const *aDb)
Definition: IpcIoFile.cc:72
void clear()
Definition: SBuf.cc:190
virtual int getFD() const
Definition: IpcIoFile.cc:603
virtual void configure(const Config &cfg)
notes supported configuration options; kids must call this first
Definition: IpcIoFile.cc:91
void NotePageNeed(const int purpose, const int count)
claim the need for a number of pages for a given purpose
Definition: Pages.cc:72
std::map< unsigned int, IpcIoPendingRequest * > RequestMap
maps requestId to the handleResponse callback
Definition: IpcIoFile.h:126
char * buf
Definition: ReadRequest.h:24
Definition: enums.h:24
bool GetPage(const PageId::Purpose purpose, PageId &page)
sets page ID and returns true unless no free pages are found
Definition: Pages.cc:34
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response)
Definition: IpcIoFile.cc:277
#define DBG_CRITICAL
Definition: Debug.h:44
bool timeoutCheckScheduled
we expect a CheckTimeouts() call
Definition: IpcIoFile.h:131
virtual bool canRead() const
Definition: IpcIoFile.cc:192
static IpcIoFilesMap IpcIoFiles
Definition: IpcIoFile.h:140
struct timeval current_time
Definition: stub_time.cc:15
DiskFile::Config config
supported configuration options
Definition: IpcIoFile.h:87
A const & max(A const &lhs, A const &rhs)
Definition: enums.h:23
asynchronous strand search request
Definition: StrandSearch.h:20
int ioRate
shape I/O request stream to approach that many per second
Definition: DiskFile.h:36
bool IamWorkerProcess()
whether the current process handles HTTP transactions and such
Definition: stub_tools.cc:49
#define DISK_ERROR
Definition: defines.h:40
virtual void claimMemoryNeeds()
Definition: IpcIoFile.cc:944
RunnerRegistrationEntry(IpcIoRr)
void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response)
Definition: IpcIoFile.cc:228
static bool DiskerHandleMoreRequestsScheduled
whether we are waiting for an event to handle still queued I/O requests
Definition: IpcIoFile.h:146
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandCoord.cc:51
void fd_bytes(int fd, int len, unsigned int type)
Definition: fd.cc:261
WriteRequest * writeRequest
set if this is a write request
Definition: IpcIoFile.h:161
unsigned int lastRequestId
last requestId used
Definition: IpcIoFile.h:123
std::map< int, IpcIoFile * > IpcIoFilesMap
Definition: IpcIoFile.h:139
int disker
Definition: IpcIoFile.cc:62
virtual bool ioInProgress() const
Definition: IpcIoFile.cc:312
int tvSubMsec(struct timeval, struct timeval)
Definition: stub_time.cc:20
const char * xstrerr(int error)
Definition: xstrerror.cc:83
static void DiskerHandleRequests()
Definition: IpcIoFile.cc:811
void checkTimeouts()
Definition: IpcIoFile.cc:525
off_t offset
Definition: ReadRequest.h:25
int n_strands
number of disk processes required to support all cache_dirs
Definition: SquidConfig.h:62
static const char *const ShmLabel
shared memory segment path to use for IpcIoFile maps
Definition: IpcIoFile.cc:38
static SBuf DbName
full db file name
Definition: IpcIoFile.cc:644
static std::unique_ptr< Queue > queue
IPC queue.
Definition: IpcIoFile.h:143
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
struct StatCounters::@136 syscalls
#define DBG_IMPORTANT
Definition: Debug.h:45
friend class IpcIoPendingRequest
Definition: IpcIoFile.h:90
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:51
struct StatCounters::@136::@139 disk
RequestMap requestMap1
older (or newer) pending requests
Definition: IpcIoFile.h:127
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:202
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
Definition: IpcIoFile.cc:483
virtual void closeCompleted()=0
static void DiskerClose(const SBuf &path)
Definition: IpcIoFile.cc:912
char const * termedBuf() const
Definition: SquidString.h:90
void * addr
Definition: membanger.c:46
int xerrno
I/O error code or zero.
Definition: IpcIoFile.h:53
char const * buf
Definition: WriteRequest.h:25
void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
track a new pending request
Definition: IpcIoFile.cc:319
const char * c_str()
Definition: SBuf.cc:546
virtual void read(ReadRequest *)
Definition: IpcIoFile.cc:210
int getInt() const
load an integer
Definition: TypedMsgHdr.cc:101
static void diskerWrite(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:734
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: StrandSearch.cc:28
asynchronous strand search response
Definition: StrandSearch.h:33
static bool WaitBeforePop()
Definition: IpcIoFile.cc:750
void const char * buf
Definition: stub_helper.cc:16
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:147
static void DiskerHandleMoreRequests(void *)
Definition: IpcIoFile.cc:741
ReadRequest * readRequest
set if this is a read requests
Definition: IpcIoFile.h:160
static const int QueueCapacity
Definition: IpcIoFile.cc:43
SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker)
Definition: IpcIoFile.cc:57
unsigned short mode_t
Definition: types.h:150
#define Must(cond)
Definition: TextException.h:89
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:22
Ipc::Mem::PageId page
Definition: IpcIoFile.h:48
const IpcIoFile::Pointer file
the file object waiting for the response
Definition: IpcIoFile.h:159
size_t PageSize()
returns page size in bytes; all pages are assumed to be the same size
Definition: Pages.cc:28
time_t getCurrentTime(void)
Get current time.
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:109
virtual bool error() const
Definition: IpcIoFile.cc:204
int KidIdentifier
virtual ~IpcIoFile()
Definition: IpcIoFile.cc:79
void completeIo(IpcIoMsg *const response)
called when response is received and, with a nil response, on timeouts
Definition: IpcIoFile.cc:630
virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ReadRequest >)=0
RequestMap requestMap2
newer (or older) pending requests
Definition: IpcIoFile.h:128
Ipc::FewToFewBiQueue::Owner * owner
Definition: IpcIoFile.cc:938
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:31
Store::DiskConfig cacheSwap
Definition: SquidConfig.h:418
IpcIo wrapper for debugs() streams; XXX: find a better class name.
Definition: IpcIoFile.cc:56
int store_open_disk_fd
RequestMap * newerRequests
newer requests (map2 or map1)
Definition: IpcIoFile.h:130
void push(IpcIoPendingRequest *const pending)
push an I/O request to disker
Definition: IpcIoFile.cc:330
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:221
size_t len
Definition: IpcIoFile.h:47
#define TexcHere(msg)
Definition: TextException.h:81
void handleResponse(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:453
void PutPage(PageId &page)
makes identified page available as a free page to future GetPage() callers
Definition: Pages.cc:41
virtual const char * what() const
time_msec_t ioTimeout
canRead/Write should return false if expected I/O delay exceeds it
Definition: DiskFile.h:33
int worker
Definition: IpcIoFile.cc:60
static int TheFile
db file descriptor
Definition: IpcIoFile.cc:645
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:32
static const double Timeout
timeout value in seconds
Definition: IpcIoFile.h:133
const String dbName
the name of the file we are managing
Definition: IpcIoFile.h:117
void setType(int aType)
sets message type; use MessageType enum
Definition: TypedMsgHdr.cc:90
unsigned int requestId
unique for requestor; matches request w/ response
Definition: IpcIoFile.h:44
bool IamDiskProcess() STUB_RETVAL_NOP(false) bool InDaemonMode() STUB_RETVAL_NOP(false) bool UsingSmp() STUB_RETVAL_NOP(false) bool IamCoordinatorProcess() STUB_RETVAL(false) bool IamPrimaryProcess() STUB_RETVAL(false) int NumberOfKids() STUB_RETVAL(0) void setMaxFD(void) STUB void setSystemLimits(void) STUB void squid_signal(int
whether the current process is dedicated to managing a cache_dir
#define DISK_OK
Definition: defines.h:39
void openCompleted(const Ipc::StrandSearchResponse *const response)
Definition: IpcIoFile.cc:143
static IpcIoFileList WaitingForOpen
pending open requests
Definition: IpcIoFile.h:136
const IpcIoMsg & msg
Definition: IpcIoFile.cc:61
off_t offset
Definition: IpcIoFile.h:46
AtomicSignedMsec Balance
Definition: Queue.h:58
virtual void ioCompletedNotification()=0
void file_close(int fd)
Definition: fs_io.cc:76
IpcIoPendingRequest * dequeueRequest(const unsigned int requestId)
returns and forgets the right IpcIoFile pending request
Definition: IpcIoFile.cc:579
CBDATA_CLASS_INIT(IpcIoFile)
virtual void writeCompleted(int errflag, size_t len, RefCount< WriteRequest >)=0
static void OpenTimeout(void *const param)
handles open request timeout
Definition: IpcIoFile.cc:496
class SquidConfig Config
Definition: SquidConfig.cc:12
static String CoordinatorAddr()
get the IPC message address for coordinator process
Definition: Port.cc:64
#define NULL
Definition: types.h:166
A const & min(A const &lhs, A const &rhs)
virtual void close()
Definition: IpcIoFile.cc:180
#define false
Definition: GnuRegex.c:233
converts DiskIO requests to IPC queue messages
Definition: IpcIoFile.h:38
struct timeval start
when the I/O request was converted to IpcIoMsg
Definition: IpcIoFile.h:51
static bool DiskerOpen(const SBuf &path, int flags, mode_t mode)
Definition: IpcIoFile.cc:892
static void diskerWriteAttempts(IpcIoMsg &ipcIo)
Definition: IpcIoFile.cc:678

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors