RockRebuild.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Disk IO Routines */
10 
11 #include "squid.h"
12 #include "base/AsyncJobCalls.h"
13 #include "debug/Messages.h"
14 #include "fs/rock/RockDbCell.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "fs/rock/RockSwapDir.h"
17 #include "fs_io.h"
18 #include "globals.h"
19 #include "md5.h"
20 #include "sbuf/Stream.h"
21 #include "Store.h"
22 #include "tools.h"
23 
24 #include <array>
25 #include <cerrno>
26 #include <cstring>
27 
29 
75 namespace Rock
76 {
77 
78 static bool
79 DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
80 {
81  return loadingPos >= dbSlotLimit;
82 }
83 
84 static bool
85 DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
86 {
87  // paranoid slot checking is only enabled with squid -S
88  const auto extraWork = opt_store_doublecheck ? dbSlotLimit : 0;
89  return validationPos >= (dbEntryLimit + extraWork);
90 }
91 
94 {
95 public:
96  LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {}
97 
98  /* for LoadingEntry */
99  uint8_t state:3;
100  uint8_t anchored:1;
101 
102  /* for LoadingSlot */
103  uint8_t mapped:1;
104  uint8_t finalized:1;
105  uint8_t freed:1;
106 };
107 
110 {
111 public:
112  LoadingEntry(const sfileno fileNo, LoadingParts &source);
113 
114  uint64_t &size;
115  uint32_t &version;
116 
119 
120  /* LoadingFlags::state */
121  State state() const { return static_cast<State>(flags.state); }
122  void state(State aState) const { flags.state = aState; }
123 
124  /* LoadingFlags::anchored */
125  bool anchored() const { return flags.anchored; }
126  void anchored(const bool beAnchored) { flags.anchored = beAnchored; }
127 
128 private:
130 };
131 
134 {
135 public:
136  LoadingSlot(const SlotId slotId, LoadingParts &source);
137 
140 
141  /* LoadingFlags::mapped */
142  bool mapped() const { return flags.mapped; }
143  void mapped(const bool beMapped) { flags.mapped = beMapped; }
144 
145  /* LoadingFlags::finalized */
146  bool finalized() const { return flags.finalized; }
147  void finalized(const bool beFinalized) { flags.finalized = beFinalized; }
148 
149  /* LoadingFlags::freed */
150  bool freed() const { return flags.freed; }
151  void freed(const bool beFreed) { flags.freed = beFreed; }
152 
153  bool used() const { return freed() || mapped() || more != -1; }
154 
155 private:
157 };
158 
162 {
163 public:
168 
169  LoadingParts(const SwapDir &dir, const bool resuming);
170  ~LoadingParts();
171 
172  // lacking copying/moving code and often too huge to copy
174 
175  Sizes &sizes() const { return *sizesOwner->object(); }
176  Versions &versions() const { return *versionsOwner->object(); }
177  Mores &mores() const { return *moresOwner->object(); }
178  Flags &flags() const { return *flagsOwner->object(); }
179 
180 private:
181  /* Anti-padding storage. With millions of entries, padding matters! */
182 
183  /* indexed by sfileno */
186 
187  /* indexed by SlotId */
189 
190  /* entry flags are indexed by sfileno; slot flags -- by SlotId */
192 };
193 
194 } /* namespace Rock */
195 
196 /* LoadingEntry */
197 
199  size(source.sizes().at(fileNo)),
200  version(source.versions().at(fileNo)),
201  flags(source.flags().at(fileNo))
202 {
203 }
204 
205 /* LoadingSlot */
206 
208  more(source.mores().at(slotId)),
209  flags(source.flags().at(slotId))
210 {
211 }
212 
213 /* LoadingParts */
214 
215 template <class T>
216 inline typename T::Owner *
217 createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
218 {
219  auto id = Ipc::Mem::Segment::Name(SBuf(dirPath), sfx);
220  return resuming ? Ipc::Mem::Owner<T>::Old(id.c_str()) : shm_new(T)(id.c_str(), limit);
221 }
222 
223 Rock::LoadingParts::LoadingParts(const SwapDir &dir, const bool resuming):
224  sizesOwner(createOwner<Sizes>(dir.path, "rebuild_sizes", dir.entryLimitActual(), resuming)),
225  versionsOwner(createOwner<Versions>(dir.path, "rebuild_versions", dir.entryLimitActual(), resuming)),
226  moresOwner(createOwner<Mores>(dir.path, "rebuild_mores", dir.slotLimitActual(), resuming)),
227  flagsOwner(createOwner<Flags>(dir.path, "rebuild_flags", dir.slotLimitActual(), resuming))
228 {
229  assert(sizes().capacity == versions().capacity); // every entry has both fields
230  assert(sizes().capacity <= mores().capacity); // every entry needs slot(s)
231  assert(mores().capacity == flags().capacity); // every slot needs a set of flags
232 
233  if (!resuming) {
234  // other parts rely on shared memory segments being zero-initialized
235  // TODO: refactor the next slot pointer to use 0 for nil values
236  mores().fill(-1);
237  }
238 }
239 
241 {
242  delete sizesOwner;
243  delete versionsOwner;
244  delete moresOwner;
245  delete flagsOwner;
246 }
247 
248 /* Rock::Rebuild::Stats */
249 
250 SBuf
251 Rock::Rebuild::Stats::Path(const char *dirPath)
252 {
253  return Ipc::Mem::Segment::Name(SBuf(dirPath), "rebuild_stats");
254 }
255 
258 {
259  return shm_new(Stats)(Path(dir.path).c_str());
260 }
261 
262 bool
264 {
265  return DoneLoading(counts.scancount, dir.slotLimitActual()) &&
267 }
268 
269 /* Rebuild */
270 
271 bool
273 {
274  // in SMP mode, only the disker is responsible for populating the map
275  return !UsingSmp() || IamDiskProcess();
276 }
277 
278 bool
280 {
281  if (!IsResponsible(dir)) {
282  debugs(47, 2, "not responsible for indexing cache_dir #" <<
283  dir.index << " from " << dir.filePath);
284  return false;
285  }
286 
287  const auto stats = shm_old(Rebuild::Stats)(Stats::Path(dir.path).c_str());
288  if (stats->completed(dir)) {
289  debugs(47, 2, "already indexed cache_dir #" <<
290  dir.index << " from " << dir.filePath);
291  return false;
292  }
293 
294  AsyncJob::Start(new Rebuild(&dir, stats));
295  return true;
296 }
297 
299  sd(dir),
300  parts(nullptr),
301  stats(s),
302  dbSize(0),
303  dbSlotSize(0),
304  dbSlotLimit(0),
305  dbEntryLimit(0),
306  fd(-1),
307  dbOffset(0),
308  loadingPos(stats->counts.scancount),
309  validationPos(stats->counts.validations),
310  counts(stats->counts),
311  resuming(stats->counts.started())
312 {
313  assert(sd);
314  dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
319  registerRunner();
320 }
321 
323 {
324  if (fd >= 0)
325  file_close(fd);
326  // normally, segments are used until the Squid instance quits,
327  // but these indexing-only segments are no longer needed
328  delete parts;
329 }
330 
331 void
333 {
334  mustStop("startShutdown");
335 }
336 
338 void
340 {
341  assert(IsResponsible(*sd));
342 
343  if (!resuming) {
344  debugs(47, Important(18), "Loading cache_dir #" << sd->index <<
345  " from " << sd->filePath);
346  } else {
347  debugs(47, Important(63), "Resuming indexing cache_dir #" << sd->index <<
348  " from " << sd->filePath << ':' << progressDescription());
349  }
350 
351  fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
352  if (fd < 0)
353  failure("cannot open db", errno);
354 
355  char hdrBuf[SwapDir::HeaderSize];
356  if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize)
357  failure("cannot read db header", errno);
358 
359  // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours
360  assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
361  buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
362 
363  dbOffset = SwapDir::HeaderSize + loadingPos * dbSlotSize;
364 
365  assert(!parts);
366  parts = new LoadingParts(*sd, resuming);
367 
369 
370  checkpoint();
371 }
372 
374 void
376 {
377  if (!done())
378  eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
379 }
380 
381 bool
383 {
384  return DoneLoading(loadingPos, dbSlotLimit);
385 }
386 
387 bool
389 {
390  return DoneValidating(validationPos, dbSlotLimit, dbEntryLimit);
391 }
392 
393 bool
395 {
396  return doneLoading() && doneValidating() && AsyncJob::doneAll();
397 }
398 
399 void
401 {
402  // use async call to enable job call protection that time events lack
403  CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
404 }
405 
406 void
408 {
409  if (!doneLoading())
410  loadingSteps();
411  else
412  validationSteps();
413 
414  checkpoint();
415 }
416 
417 void
419 {
420  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
421  dbOffset << " <= " << dbSize);
422 
423  // Balance our desire to maximize the number of entries processed at once
424  // (and, hence, minimize overheads and total rebuild time) with a
425  // requirement to also process Coordinator events, disk I/Os, etc.
426  const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
427  const timeval loopStart = current_time;
428 
429  int64_t loaded = 0;
430  while (!doneLoading()) {
431  loadOneSlot();
432  dbOffset += dbSlotSize;
433  ++loadingPos;
434  ++loaded;
435 
436  if (counts.scancount % 1000 == 0)
437  storeRebuildProgress(sd->index, dbSlotLimit, counts.scancount);
438 
440  continue; // skip "few entries at a time" check below
441 
442  getCurrentTime();
443  const double elapsedMsec = tvSubMsec(loopStart, current_time);
444  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
445  debugs(47, 5, "pausing after " << loaded << " entries in " <<
446  elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry");
447  break;
448  }
449  }
450 }
451 
454 {
455  Must(0 <= fileNo && fileNo < dbEntryLimit);
456  return LoadingEntry(fileNo, *parts);
457 }
458 
461 {
462  Must(0 <= slotId && slotId < dbSlotLimit);
463  Must(slotId <= loadingPos); // cannot look ahead
464  return LoadingSlot(slotId, *parts);
465 }
466 
467 void
469 {
470  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
471  dbOffset << " <= " << dbSize);
472 
473  // increment before loadingPos to avoid getting stuck at a slot
474  // in a case of crash
475  ++counts.scancount;
476 
477  if (lseek(fd, dbOffset, SEEK_SET) < 0)
478  failure("cannot seek to db entry", errno);
479 
480  buf.reset();
481 
482  if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
483  return;
484 
485  const SlotId slotId = loadingPos;
486 
487  // get our header
488  DbCellHeader header;
489  if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
490  debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
491  "Ignoring truncated " << buf.contentSize() << "-byte " <<
492  "cache entry meta data at " << dbOffset);
493  freeUnusedSlot(slotId, true);
494  return;
495  }
496  memcpy(&header, buf.content(), sizeof(header));
497  if (header.empty()) {
498  freeUnusedSlot(slotId, false);
499  return;
500  }
501  if (!header.sane(dbSlotSize, dbSlotLimit)) {
502  debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
503  "Ignoring malformed cache entry meta data at " << dbOffset);
504  freeUnusedSlot(slotId, true);
505  return;
506  }
507  buf.consume(sizeof(header)); // optimize to avoid memmove()
508 
509  useNewSlot(slotId, header);
510 }
511 
514 static bool
515 ZeroedSlot(const MemBuf &buf)
516 {
517  // We could memcmp the entire buffer, but it is probably safe enough to test
518  // a few bytes because even if we do not detect a corrupted entry, it is not
519  // a big deal: Store::UnpackPrefix() rejects all-0s metadata prefix.
520  static const std::array<char, 10> zeros = {};
521 
522  if (static_cast<size_t>(buf.contentSize()) < zeros.size())
523  return false; // cannot be sure enough
524 
525  return memcmp(buf.content(), zeros.data(), zeros.size()) == 0;
526 }
527 
529 bool
531 {
533  StoreEntry loadedE;
534  const uint64_t knownSize = header.entrySize > 0 ?
535  header.entrySize : anchor.basics.swap_file_sz.load();
536 
537  if (ZeroedSlot(buf))
538  return false;
539 
540  if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize))
541  return false;
542 
543  // the entry size may be unknown, but if it is known, it is authoritative
544 
545  debugs(47, 8, "importing basics for entry " << fileno <<
546  " inode.entrySize: " << header.entrySize <<
547  " swap_file_sz: " << loadedE.swap_file_sz);
548  anchor.set(loadedE);
549 
550  // we have not validated whether all db cells for this entry were loaded
552 
553  // loadedE->dump(5);
554 
555  return true;
556 }
557 
558 void
560 {
561  debugs(47, 5, sd->index << " validating from " << validationPos);
562 
563  // see loadingSteps() for the rationale; TODO: avoid duplication
564  const int maxSpentMsec = 50; // keep small: validation does not do I/O
565  const timeval loopStart = current_time;
566 
567  int64_t validated = 0;
568  while (!doneValidating()) {
569  // increment before validationPos to avoid getting stuck at a slot
570  // in a case of crash
572  if (validationPos < dbEntryLimit)
573  validateOneEntry(validationPos);
574  else
575  validateOneSlot(validationPos - dbEntryLimit);
576  ++validationPos;
577  ++validated;
578 
579  if (validationPos % 1000 == 0)
580  debugs(20, 2, "validated: " << validationPos);
581 
583  continue; // skip "few entries at a time" check below
584 
585  getCurrentTime();
586  const double elapsedMsec = tvSubMsec(loopStart, current_time);
587  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
588  debugs(47, 5, "pausing after " << validated << " entries in " <<
589  elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry");
590  break;
591  }
592  }
593 }
594 
597 void
599 {
600  // walk all map-linked slots, starting from inode, and mark each
601  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo);
602  Must(le.size > 0); // paranoid
603  uint64_t mappedSize = 0;
604  SlotId slotId = anchor.start;
605  while (slotId >= 0 && mappedSize < le.size) {
606  LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot
607  Must(!slot.finalized()); // no loops or stealing from other entries
608  Must(slot.mapped()); // all our slots should be in the sd->map
609  Must(!slot.freed()); // all our slots should still be present
610  slot.finalized(true);
611 
612  Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId);
613  Must(mapSlice.size > 0); // paranoid
614  mappedSize += mapSlice.size;
615  slotId = mapSlice.next;
616  }
617  /* no hodgepodge entries: one entry - one full chain and no leftovers */
618  Must(slotId < 0);
619  Must(mappedSize == le.size);
620 
621  if (!anchor.basics.swap_file_sz)
622  anchor.basics.swap_file_sz = le.size;
625  sd->map->closeForWriting(fileNo);
626  ++counts.objcount;
627 }
628 
631 void
633 {
634  try {
635  finalizeOrThrow(fileNo, le);
636  } catch (const std::exception &ex) {
637  freeBadEntry(fileNo, ex.what());
638  }
639 }
640 
641 void
643 {
644  LoadingEntry entry = loadingEntry(fileNo);
645  switch (entry.state()) {
646 
648  finalizeOrFree(fileNo, entry);
649  break;
650 
651  case LoadingEntry::leEmpty: // no entry hashed to this position
652  case LoadingEntry::leLoaded: // we have already unlocked this entry
653  case LoadingEntry::leCorrupted: // we have already removed this entry
654  case LoadingEntry::leIgnored: // we have already discarded this entry
655  break;
656  }
657 }
658 
659 void
661 {
662  const LoadingSlot slot = loadingSlot(slotId);
663  // there should not be any unprocessed slots left
664  Must(slot.freed() || (slot.mapped() && slot.finalized()));
665 }
666 
669 void
670 Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
671 {
672  debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
673  " entry " << fileno << " is ignored during rebuild");
674 
675  LoadingEntry le = loadingEntry(fileno);
677 
678  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
679  assert(anchor.start < 0 || le.size > 0);
680  for (SlotId slotId = anchor.start; slotId >= 0;) {
681  const SlotId next = loadingSlot(slotId).more;
682  freeSlot(slotId, true);
683  slotId = next;
684  }
685 
686  sd->map->forgetWritingEntry(fileno);
687 }
688 
689 void
691 {
692  debugs(47,3, "cache_dir #" << sd->index << " rebuild level: " <<
695 }
696 
697 void
698 Rock::Rebuild::failure(const char *msg, int errNo)
699 {
700  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
701  dbOffset << " <= " << dbSize);
702 
703  if (errNo)
704  debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir rebuild failure: " << xstrerr(errNo));
705  debugs(47, DBG_CRITICAL, "Do you need to run 'squid -z' to initialize storage?");
706 
707  assert(sd);
708  fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.",
709  sd->index, sd->filePath, msg);
710 }
711 
713 void
714 Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
715 {
716  debugs(47,5, sd->index << " frees slot " << slotId);
717  LoadingSlot slot = loadingSlot(slotId);
718  assert(!slot.freed());
719  slot.freed(true);
720 
721  if (invalid) {
722  ++counts.invalid;
723  //sd->unlink(fileno); leave garbage on disk, it should not hurt
724  }
725 
726  Ipc::Mem::PageId pageId;
727  pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(sd->index);
728  pageId.number = slotId+1;
729  sd->freeSlots->push(pageId);
730 }
731 
733 void
734 Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid)
735 {
736  LoadingSlot slot = loadingSlot(slotId);
737  // mapped slots must be freed via freeBadEntry() to keep the map in sync
738  assert(!slot.mapped());
739  freeSlot(slotId, invalid);
740 }
741 
743 void
744 Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
745 {
746  LoadingSlot slot = loadingSlot(slotId);
747  assert(!slot.mapped());
748  assert(!slot.freed());
749  slot.mapped(true);
750 
751  Ipc::StoreMapSlice slice;
752  slice.next = header.nextSlot;
753  slice.size = header.payloadSize;
754  sd->map->importSlice(slotId, slice);
755 }
756 
757 template <class SlotIdType> // accommodates atomic and simple SlotIds.
758 void
759 Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to)
760 {
761  LoadingSlot slot = loadingSlot(to);
762  assert(slot.more < 0);
763  slot.more = from; // may still be unset
764  from = to;
765 }
766 
769 void
770 Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
771 {
772  LoadingEntry le = loadingEntry(fileno);
773  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
774 
775  debugs(47,9, "adding " << slotId << " to entry " << fileno);
776  // we do not need to preserve the order
777  if (le.anchored()) {
778  LoadingSlot inode = loadingSlot(anchor.start);
779  chainSlots(inode.more, slotId);
780  } else {
781  chainSlots(anchor.start, slotId);
782  }
783 
784  le.size += header.payloadSize; // must precede freeBadEntry() calls
785 
786  if (header.firstSlot == slotId) {
787  debugs(47,5, "added inode");
788 
789  if (le.anchored()) { // we have already added another inode slot
790  freeBadEntry(fileno, "inode conflict");
791  ++counts.clashcount;
792  return;
793  }
794 
795  le.anchored(true);
796 
797  if (!importEntry(anchor, fileno, header)) {
798  freeBadEntry(fileno, "corrupted metainfo");
799  return;
800  }
801 
802  // set total entry size and/or check it for consistency
803  if (const uint64_t totalSize = header.entrySize) {
804  assert(totalSize != static_cast<uint64_t>(-1));
805  if (!anchor.basics.swap_file_sz) {
806  anchor.basics.swap_file_sz = totalSize;
807  assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
808  } else if (totalSize != anchor.basics.swap_file_sz) {
809  freeBadEntry(fileno, "size mismatch");
810  return;
811  }
812  }
813  }
814 
815  const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown
816 
817  if (totalSize > 0 && le.size > totalSize) { // overflow
818  debugs(47, 8, "overflow: " << le.size << " > " << totalSize);
819  freeBadEntry(fileno, "overflowing");
820  return;
821  }
822 
823  mapSlot(slotId, header);
824  if (totalSize > 0 && le.size == totalSize)
825  finalizeOrFree(fileno, le); // entry is probably fully loaded now
826 }
827 
829 void
831 {
832  anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
833  assert(header.firstSlot >= 0);
834  anchor.start = -1; // addSlotToEntry() will set it
835 
836  assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
837 
838  LoadingEntry le = loadingEntry(fileno);
840  le.version = header.version;
841  le.size = 0;
842 }
843 
845 void
846 Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
847 {
848  // A miss may have been stored at our fileno while we were loading other
849  // slots from disk. We ought to preserve that entry because it is fresher.
850  const bool overwriteExisting = false;
851  if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) {
852  primeNewEntry(*anchor, fileno, header);
853  addSlotToEntry(fileno, slotId, header); // may fail
854  assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1));
855  } else {
856  // A new from-network entry is occupying our map slot; let it be, but
857  // save us from the trouble of going through the above motions again.
858  LoadingEntry le = loadingEntry(fileno);
860  freeUnusedSlot(slotId, false);
861  }
862 }
863 
865 bool
866 Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
867 {
868  // Header updates always result in multi-start chains and often
869  // result in multi-version chains so we can only compare the keys.
870  const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
871  return anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
872 }
873 
875 void
876 Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
877 {
878  const cache_key *const key =
879  reinterpret_cast<const cache_key*>(header.key);
880  const sfileno fileno = sd->map->fileNoByKey(key);
881  assert(0 <= fileno && fileno < dbEntryLimit);
882 
883  LoadingEntry le = loadingEntry(fileno);
884  debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " <<
885  header.firstSlot << ", size: " << header.payloadSize);
886 
887  switch (le.state()) {
888 
889  case LoadingEntry::leEmpty: {
890  startNewEntry(fileno, slotId, header);
891  break;
892  }
893 
895  if (sameEntry(fileno, header)) {
896  addSlotToEntry(fileno, slotId, header); // may fail
897  } else {
898  // either the loading chain or this slot is stale;
899  // be conservative and ignore both (and any future ones)
900  freeBadEntry(fileno, "duplicated");
901  freeUnusedSlot(slotId, true);
902  ++counts.dupcount;
903  }
904  break;
905  }
906 
907  case LoadingEntry::leLoaded: {
908  // either the previously loaded chain or this slot is stale;
909  // be conservative and ignore both (and any future ones)
911  sd->map->freeEntry(fileno); // may not be immediately successful
912  freeUnusedSlot(slotId, true);
913  ++counts.dupcount;
914  break;
915  }
916 
918  // previously seen slots messed things up so we must ignore this one
919  freeUnusedSlot(slotId, true);
920  break;
921  }
922 
924  // already replaced by a fresher or colliding from-network entry
925  freeUnusedSlot(slotId, false);
926  break;
927  }
928  }
929 }
930 
931 SBuf
933 {
934  SBufStream str;
935 
936  str << Debug::Extra << "slots loaded: " << Progress(loadingPos, dbSlotLimit);
937 
938  const auto validatingEntries = validationPos < dbEntryLimit;
939  const auto entriesValidated = validatingEntries ? validationPos : dbEntryLimit;
940  str << Debug::Extra << "entries validated: " << Progress(entriesValidated, dbEntryLimit);
941  if (opt_store_doublecheck) {
942  const auto slotsValidated = validatingEntries ? 0 : (validationPos - dbEntryLimit);
943  str << Debug::Extra << "slots validated: " << Progress(slotsValidated, dbSlotLimit);
944  }
945 
946  return str.buf();
947 }
948 
#define CallJobHere(debugSection, debugLevel, job, Class, method)
Definition: AsyncJobCalls.h:58
ssize_t mb_size_t
Definition: MemBuf.h:17
int size
Definition: ModDevPoll.cc:75
#define shm_new(Class)
Definition: Pointer.h:200
#define shm_old(Class)
Definition: Pointer.h:201
static bool ZeroedSlot(const MemBuf &buf)
Definition: RockRebuild.cc:515
T::Owner * createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
Definition: RockRebuild.cc:217
CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild)
#define Must(condition)
Definition: TextException.h:71
#define assert(EX)
Definition: assert.h:19
static int version
static void Start(const Pointer &job)
Definition: AsyncJob.cc:24
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:97
static std::ostream & Extra(std::ostream &os)
prefixes each grouped debugs() line after the first one in the group
Definition: Stream.h:117
Class * object()
Raw access; handy to finalize initiatization, but avoid if possible.
Definition: Pointer.h:43
static Owner * Old(const char *const id)
attaches to the existing shared memory segment, becoming its owner
Definition: Pointer.h:123
Shared memory page identifier, address, or handler.
Definition: Page.h:24
PoolId pool
Definition: Page.h:39
uint32_t number
page number within the segment
Definition: Page.h:42
static PoolId IdForSwapDirSpace(const int dirIdx)
stack of free rock cache_dir slot numbers
Definition: PageStack.h:171
static SBuf Name(const SBuf &prefix, const char *suffix)
concatenates parts of a name to form a complete name (or its prefix)
Definition: Segment.cc:52
std::atomic< StoreMapSliceId > start
where the chain of StoreEntry slices begins [app]
Definition: StoreMap.h:111
bool sameKey(const cache_key *const aKey) const
Definition: StoreMap.cc:952
struct Ipc::StoreMapAnchor::Basics basics
void set(const StoreEntry &anEntry, const cache_key *aKey=nullptr)
store StoreEntry key and basics for an inode slot
Definition: StoreMap.cc:959
void setKey(const cache_key *const aKey)
Definition: StoreMap.cc:945
void fill(const Item &value)
reset all items to the same value
Definition: StoreMap.h:145
std::atomic< StoreMapSliceId > next
ID of the next entry slice.
Definition: StoreMap.h:49
std::atomic< Size > size
slice contents size
Definition: StoreMap.h:48
Definition: MemBuf.h:24
char * content()
start of the added data
Definition: MemBuf.h:41
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
advancement of work that consists of (usually known number) of similar steps
Definition: store_rebuild.h:47
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
bool sane(const size_t slotSize, int slotLimit) const
whether this slot is not corrupted
Definition: RockDbCell.h:33
bool empty() const
true iff no entry occupies this slot
Definition: RockDbCell.h:28
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays)
Definition: RockRebuild.cc:110
void anchored(const bool beAnchored)
Definition: RockRebuild.cc:126
State
possible store entry states during index rebuild
Definition: RockRebuild.cc:118
LoadingFlags & flags
entry flags (see the above accessors) are ours
Definition: RockRebuild.cc:129
bool anchored() const
Definition: RockRebuild.cc:125
uint64_t & size
payload seen so far
Definition: RockRebuild.cc:114
LoadingEntry(const sfileno fileNo, LoadingParts &source)
Definition: RockRebuild.cc:198
State state() const
Definition: RockRebuild.cc:121
void state(State aState) const
Definition: RockRebuild.cc:122
uint32_t & version
DbCellHeader::version to distinguish same-URL chains.
Definition: RockRebuild.cc:115
low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
Definition: RockRebuild.cc:94
uint8_t state
current entry state (one of the LoadingEntry::State values)
Definition: RockRebuild.cc:99
uint8_t finalized
whether finalizeOrThrow() has scanned the slot
Definition: RockRebuild.cc:104
uint8_t freed
whether the slot was given to the map as free space
Definition: RockRebuild.cc:105
uint8_t anchored
whether we loaded the inode slot for this entry
Definition: RockRebuild.cc:100
uint8_t mapped
whether the slot was added to a mapped entry
Definition: RockRebuild.cc:103
Sizes & sizes() const
Definition: RockRebuild.cc:175
Sizes::Owner * sizesOwner
LoadingEntry::size for all entries.
Definition: RockRebuild.cc:184
Versions::Owner * versionsOwner
LoadingEntry::version for all entries.
Definition: RockRebuild.cc:185
LoadingParts(LoadingParts &&)=delete
LoadingParts(const SwapDir &dir, const bool resuming)
Definition: RockRebuild.cc:223
Mores & mores() const
Definition: RockRebuild.cc:177
Versions & versions() const
Definition: RockRebuild.cc:176
Mores::Owner * moresOwner
LoadingSlot::more for all slots.
Definition: RockRebuild.cc:188
Flags & flags() const
Definition: RockRebuild.cc:178
Flags::Owner * flagsOwner
all LoadingEntry and LoadingSlot flags
Definition: RockRebuild.cc:191
smart db slot-level info pointer (hides anti-padding LoadingParts arrays)
Definition: RockRebuild.cc:134
bool freed() const
Definition: RockRebuild.cc:150
bool used() const
Definition: RockRebuild.cc:153
LoadingSlot(const SlotId slotId, LoadingParts &source)
Definition: RockRebuild.cc:207
void finalized(const bool beFinalized)
Definition: RockRebuild.cc:147
bool mapped() const
Definition: RockRebuild.cc:142
void mapped(const bool beMapped)
Definition: RockRebuild.cc:143
bool finalized() const
Definition: RockRebuild.cc:146
void freed(const bool beFreed)
Definition: RockRebuild.cc:151
Ipc::StoreMapSliceId & more
another slot in some chain belonging to the same entry (unordered!)
Definition: RockRebuild.cc:139
LoadingFlags & flags
slot flags (see the above accessors) are ours
Definition: RockRebuild.cc:156
cache_dir indexing statistics shared across same-kid process restarts
Definition: RockRebuild.h:37
static SBuf Path(const char *dirPath)
Definition: RockRebuild.cc:251
static Ipc::Mem::Owner< Stats > * Init(const SwapDir &)
Definition: RockRebuild.cc:257
bool completed(const SwapDir &) const
whether the rebuild is finished already
Definition: RockRebuild.cc:263
void freeBadEntry(const sfileno fileno, const char *eDescription)
Definition: RockRebuild.cc:670
virtual bool doneAll() const override
whether positive goal has been reached
Definition: RockRebuild.cc:394
void validateOneSlot(const SlotId slotId)
Definition: RockRebuild.cc:660
static void Steps(void *data)
Definition: RockRebuild.cc:400
void validationSteps()
Definition: RockRebuild.cc:559
void loadingSteps()
Definition: RockRebuild.cc:418
void finalizeOrThrow(const sfileno fileNo, LoadingEntry &le)
Definition: RockRebuild.cc:598
void freeSlot(const SlotId slotId, const bool invalid)
adds slot to the free slot index
Definition: RockRebuild.cc:714
void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
initialize housekeeping information for a newly accepted entry
Definition: RockRebuild.cc:830
void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
Definition: RockRebuild.cc:770
static bool Start(SwapDir &dir)
Definition: RockRebuild.cc:279
void mapSlot(const SlotId slotId, const DbCellHeader &header)
adds slot to the entry chain in the map
Definition: RockRebuild.cc:744
LoadingEntry loadingEntry(const sfileno fileNo)
Definition: RockRebuild.cc:453
LoadingSlot loadingSlot(const SlotId slotId)
Definition: RockRebuild.cc:460
virtual void start() override
prepares and initiates entry loading sequence
Definition: RockRebuild.cc:339
int64_t dbSlotLimit
total number of db cells
Definition: RockRebuild.h:113
int64_t dbEntryLimit
maximum number of entries that can be stored in db
Definition: RockRebuild.h:114
void checkpoint()
continues after a pause if not done
Definition: RockRebuild.cc:375
int dbSlotSize
the size of a db cell, including the cell header
Definition: RockRebuild.h:112
void chainSlots(SlotIdType &from, const SlotId to)
Definition: RockRebuild.cc:759
void validateOneEntry(const sfileno fileNo)
Definition: RockRebuild.cc:642
void loadOneSlot()
Definition: RockRebuild.cc:468
bool doneLoading() const
Definition: RockRebuild.cc:382
Rebuild(SwapDir *dir, const Ipc::Mem::Pointer< Stats > &)
Definition: RockRebuild.cc:298
bool doneValidating() const
Definition: RockRebuild.cc:388
bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header)
parse StoreEntry basics and add them to the map, returning true on success
Definition: RockRebuild.cc:530
int64_t dbSize
Definition: RockRebuild.h:111
void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
handle a slot from an entry that we have not seen before
Definition: RockRebuild.cc:846
void failure(const char *msg, int errNo=0)
Definition: RockRebuild.cc:698
SBuf progressDescription() const
Definition: RockRebuild.cc:932
static bool IsResponsible(const SwapDir &)
whether the current kid is responsible for rebuilding the given cache_dir
Definition: RockRebuild.cc:272
SwapDir * sd
Definition: RockRebuild.h:106
void useNewSlot(const SlotId slotId, const DbCellHeader &header)
handle freshly loaded (and validated) db slot header
Definition: RockRebuild.cc:876
bool sameEntry(const sfileno fileno, const DbCellHeader &header) const
does the header belong to the fileno entry being loaded?
Definition: RockRebuild.cc:866
virtual void swanSong() override
Definition: RockRebuild.cc:690
virtual ~Rebuild() override
Definition: RockRebuild.cc:322
void freeUnusedSlot(const SlotId slotId, const bool invalid)
freeSlot() for never-been-mapped slots
Definition: RockRebuild.cc:734
void finalizeOrFree(const sfileno fileNo, LoadingEntry &le)
Definition: RockRebuild.cc:632
virtual void startShutdown() override
Definition: RockRebuild.cc:332
int64_t slotLimitActual() const
total number of slots in this db
Definition: RockSwapDir.cc:196
uint64_t slotSize
all db slots are of this size
Definition: RockSwapDir.h:83
const char * filePath
location of cache storage file inside path/
Definition: RockSwapDir.h:136
static const int64_t HeaderSize
on-disk db header size
Definition: RockSwapDir.h:152
int64_t diskOffsetLimit() const
Definition: RockSwapDir.cc:694
int64_t entryLimitActual() const
max number of possible entries in db
Definition: RockSwapDir.cc:205
SBuf buf()
bytes written so far
Definition: Stream.h:41
Definition: SBuf.h:94
uint64_t swap_file_sz
Definition: Store.h:228
void updateStartTime(const timeval &dirStartTime)
maintain earliest initiation time across multiple indexing cache_dirs
int64_t validations
the number of validated cache entries, slots
Definition: store_rebuild.h:41
static int store_dirs_rebuilding
the number of cache_dirs being rebuilt; TODO: move to Disks::Rebuilding
Definition: Controller.h:139
char * path
Definition: Disk.h:102
int index
Definition: Disk.h:103
#define Important(id)
Definition: Messages.h:91
#define DBG_IMPORTANT
Definition: Stream.h:41
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:196
#define DBG_CRITICAL
Definition: Stream.h:40
#define SM_PAGE_SIZE
Definition: defines.h:65
#define EBIT_CLR(flag, bit)
Definition: defines.h:68
#define O_BINARY
Definition: defines.h:136
#define EBIT_SET(flag, bit)
Definition: defines.h:67
@ ENTRY_VALIDATED
Definition: enums.h:113
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
int file_open(const char *path, int mode)
Definition: fs_io.cc:45
void file_close(int fd)
Definition: fs_io.cc:73
int opt_store_doublecheck
int opt_foreground_rebuild
#define SQUID_MD5_DIGEST_LENGTH
Definition: md5.h:66
int32_t StoreMapSliceId
Definition: StoreMap.h:24
class Ping::pingStats_ stats
Definition: forward.h:28
static bool DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
Definition: RockRebuild.cc:79
sfileno SlotId
db cell number, starting with cell 0 (always occupied by the db header)
Definition: forward.h:30
static bool DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
Definition: RockRebuild.cc:85
unsigned char cache_key
Store key.
Definition: forward.h:29
signed_int32_t sfileno
Definition: forward.h:22
bool storeRebuildParseEntry(MemBuf &buf, StoreEntry &tmpe, cache_key *key, StoreRebuildData &stats, uint64_t expectedSize)
void storeRebuildProgress(int sd_index, int total, int sofar)
bool storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf, StoreRebuildData &)
loads entry from disk; fills supplied memory buffer on success
static StoreRebuildData counts
void storeRebuildComplete(StoreRebuildData *dc)
std::atomic< uint64_t > swap_file_sz
Definition: StoreMap.h:105
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
bool IamDiskProcess() STUB_RETVAL_NOP(false) bool InDaemonMode() STUB_RETVAL_NOP(false) bool UsingSmp() STUB_RETVAL_NOP(false) bool IamCoordinatorProcess() STUB_RETVAL(false) bool IamPrimaryProcess() STUB_RETVAL(false) int NumberOfKids() STUB_RETVAL(0) void setMaxFD(void) STUB void setSystemLimits(void) STUB void squid_signal(int
whether the current process is dedicated to managing a cache_dir
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:17
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
bool UsingSmp()
Whether there should be more than one worker process running.
Definition: tools.cc:693
const char * xstrerr(int error)
Definition: xstrerror.cc:83

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors