Transients.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 20 Storage Manager */
10 
11 #include "squid.h"
12 #include "base/RunnersRegistry.h"
13 #include "CollapsedForwarding.h"
14 #include "HttpReply.h"
15 #include "ipc/mem/Page.h"
16 #include "ipc/mem/Pages.h"
17 #include "MemObject.h"
18 #include "mime_header.h"
19 #include "SquidConfig.h"
20 #include "SquidMath.h"
21 #include "StoreStats.h"
22 #include "tools.h"
23 #include "Transients.h"
24 
25 #include <limits>
26 
28 static const SBuf MapLabel("transients_map");
29 
31 {
32 }
33 
35 {
36  delete map;
37  delete locals;
38 }
39 
40 void
42 {
43  assert(Enabled());
44  const int64_t entryLimit = EntryLimit();
45  assert(entryLimit > 0);
46 
47  Must(!map);
48  map = new TransientsMap(MapLabel);
49  map->cleaner = this;
50  map->disableHitValidation(); // Transients lacks slices to validate
51 
52  locals = new Locals(entryLimit, 0);
53 }
54 
55 void
57 {
58 #if TRANSIENT_STATS_SUPPORTED
59  const size_t pageSize = Ipc::Mem::PageSize();
60 
61  stats.mem.shared = true;
62  stats.mem.capacity =
64  stats.mem.size =
66  stats.mem.count = currentCount();
67 #else
68  (void)stats;
69 #endif
70 }
71 
72 void
74 {
75  storeAppendPrintf(&e, "\n\nTransient Objects\n");
76 
77  storeAppendPrintf(&e, "Maximum Size: %.0f KB\n", maxSize()/1024.0);
78  storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
79  currentSize() / 1024.0,
81 
82  if (map) {
83  const int limit = map->entryLimit();
84  storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
85  if (limit > 0) {
86  storeAppendPrintf(&e, "Current entries: %" PRId64 " %.2f%%\n",
87  currentCount(), (100.0 * currentCount() / limit));
88  }
89  }
90 }
91 
92 void
94 {
95  // no lazy garbage collection needed
96 }
97 
98 uint64_t
100 {
101  return 0; // XXX: irrelevant, but Store parent forces us to implement this
102 }
103 
104 uint64_t
106 {
107  // Squid currently does not limit the total size of all transient objects
109 }
110 
111 uint64_t
113 {
114  // TODO: we do not get enough information to calculate this
115  // StoreEntry should update associated stores when its size changes
116  return 0;
117 }
118 
119 uint64_t
121 {
122  return map ? map->entryCount() : 0;
123 }
124 
125 int64_t
127 {
128  // Squid currently does not limit the size of a transient object
130 }
131 
132 void
134 {
135  // no replacement policy (but the cache(s) storing the entry may have one)
136 }
137 
138 bool
140 {
141  // no need to keep e in the global store_table for us; we have our own map
142  return false;
143 }
144 
145 StoreEntry *
147 {
148  if (!map)
149  return NULL;
150 
151  sfileno index;
152  const Ipc::StoreMapAnchor *anchor = map->openForReading(key, index);
153  if (!anchor)
154  return NULL;
155 
156  // If we already have a local entry, the store_table should have found it.
157  // Since it did not, the local entry key must have changed from public to
158  // private. We still need to keep the private entry around for syncing as
159  // its clients depend on it, but we should not allow new clients to join.
160  if (StoreEntry *oldE = locals->at(index)) {
161  debugs(20, 3, "not joining private " << *oldE);
162  assert(EBIT_TEST(oldE->flags, KEY_PRIVATE));
164  return nullptr;
165  }
166 
167  // store hadWriter before checking ENTRY_REQUIRES_COLLAPSING to avoid racing
168  // the writer that clears that flag and then leaves
169  const auto hadWriter = map->peekAtWriter(index);
170  if (!hadWriter && EBIT_TEST(anchor->basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
171  debugs(20, 3, "not joining abandoned entry " << index);
173  return nullptr;
174  }
175 
176  StoreEntry *e = new StoreEntry();
177  e->createMemObject();
178  anchorEntry(*e, index, *anchor);
179 
180  // keep read lock to receive updates from others
181  return e;
182 }
183 
184 StoreEntry *
186 {
187  if (!map)
188  return NULL;
189 
190  if (StoreEntry *oldE = locals->at(index)) {
191  debugs(20, 5, "found " << *oldE << " at " << index << " in " << MapLabel);
192  assert(oldE->mem_obj && oldE->mem_obj->xitTable.index == index);
193  return oldE;
194  }
195 
196  debugs(20, 3, "no entry at " << index << " in " << MapLabel);
197  return NULL;
198 }
199 
200 void
202 {
203  assert(map);
204  assert(e.hasTransients());
205  assert(isWriter(e));
206  const auto idx = e.mem_obj->xitTable.index;
207  auto &anchor = map->writeableEntry(idx);
208  if (EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
209  EBIT_CLR(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
211  }
212 }
213 
214 void
216 {
217  if (!e->hasTransients()) {
218  addEntry(e, key, direction);
219  assert(e->hasTransients());
220  }
221 
222  const auto index = e->mem_obj->xitTable.index;
223  if (const auto old = locals->at(index)) {
224  assert(old == e);
225  } else {
226  // We do not lock e because we do not want to prevent its destruction;
227  // e is tied to us via mem_obj so we will know when it is destructed.
228  locals->at(index) = e;
229  }
230 }
231 
233 void
235 {
236  assert(e);
237  assert(e->mem_obj);
238  assert(!e->hasTransients());
239 
240  Must(map); // configured to track transients
241 
242  if (direction == Store::ioWriting)
243  return addWriterEntry(*e, key);
244 
245  assert(direction == Store::ioReading);
246  addReaderEntry(*e, key);
247 }
248 
250 void
252 {
253  sfileno index = 0;
254  const auto anchor = map->openForWriting(key, index);
255  if (!anchor)
256  throw TextException("writer collision", Here());
257 
258  // set ASAP in hope to unlock the slot if something throws
259  // and to provide index to such methods as hasWriter()
260  auto &xitTable = e.mem_obj->xitTable;
261  xitTable.index = index;
262  xitTable.io = Store::ioWriting;
263 
264  anchor->set(e, key);
265  // allow reading and receive remote DELETE events, but do not switch to
266  // the reading lock because transientReaders() callers want true readers
267  map->startAppending(index);
268 }
269 
272 void
274 {
275  sfileno index = 0;
276  const auto anchor = map->openOrCreateForReading(key, index, e);
277  if (!anchor)
278  throw TextException("reader collision", Here());
279 
280  anchorEntry(e, index, *anchor);
281  // keep the entry locked (for reading) to receive remote DELETE events
282 }
283 
285 void
287 {
288  // set ASAP in hope to unlock the slot if something throws
289  // and to provide index to such methods as hasWriter()
290  auto &xitTable = e.mem_obj->xitTable;
291  xitTable.index = index;
292  xitTable.io = Store::ioReading;
293 
294  const auto hadWriter = hasWriter(e); // before computing collapsingRequired
295  anchor.exportInto(e);
296  const bool collapsingRequired = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
297  assert(!collapsingRequired || hadWriter);
298  e.setCollapsingRequirement(collapsingRequired);
299 }
300 
301 bool
303 {
304  if (!e.hasTransients())
305  return false;
306  return map->peekAtWriter(e.mem_obj->xitTable.index);
307 }
308 
309 void
311 {
312  // TODO: we should probably find the entry being deleted and abort it
313 }
314 
315 void
316 Transients::status(const StoreEntry &entry, Transients::EntryStatus &entryStatus) const
317 {
318  assert(map);
319  assert(entry.hasTransients());
320  const auto idx = entry.mem_obj->xitTable.index;
321  const auto &anchor = isWriter(entry) ?
322  map->writeableEntry(idx) : map->readableEntry(idx);
323  entryStatus.abortedByWriter = anchor.writerHalted;
324  entryStatus.waitingToBeFreed = anchor.waitingToBeFreed;
325  entryStatus.collapsed = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
326 }
327 
328 void
330 {
331  assert(e.hasTransients());
332  assert(isWriter(e));
335 }
336 
337 int
339 {
340  if (e.hasTransients()) {
341  assert(map);
343  }
344  return 0;
345 }
346 
347 void
349 {
350  debugs(20, 5, e);
351  if (e.hasTransients()) {
352  const auto index = e.mem_obj->xitTable.index;
353  if (map->freeEntry(index)) {
354  // Delay syncCollapsed(index) which may end `e` wait for updates.
355  // Calling it directly/here creates complex reentrant call chains.
357  }
358  } // else nothing to do because e must be private
359 }
360 
361 void
363 {
364  if (!map)
365  return;
366 
367  const sfileno index = map->fileNoByKey(key);
368  if (map->freeEntry(index))
369  CollapsedForwarding::Broadcast(index, true);
370 }
371 
372 void
374 {
375  debugs(20, 5, entry);
376  if (entry.hasTransients()) {
377  auto &xitTable = entry.mem_obj->xitTable;
378  assert(map);
379  if (isWriter(entry)) {
380  map->abortWriting(xitTable.index);
381  } else {
382  assert(isReader(entry));
383  map->closeForReadingAndFreeIdle(xitTable.index);
384  }
385  locals->at(xitTable.index) = nullptr;
386  xitTable.index = -1;
387  xitTable.io = Store::ioDone;
388  }
389 }
390 
392 int64_t
394 {
395  return (UsingSmp() && Store::Controller::SmpAware()) ?
397 }
398 
399 bool
401 {
402  assert(map);
403  return map->markedForDeletion(key);
404 }
405 
406 bool
408 {
409  return e.mem_obj && e.mem_obj->xitTable.io == Store::ioReading;
410 }
411 
412 bool
414 {
415  return e.mem_obj && e.mem_obj->xitTable.io == Store::ioWriting;
416 }
417 
420 {
421 public:
422  /* RegisteredRunner API */
423  virtual void useConfig();
424  virtual ~TransientsRr();
425 
426 protected:
427  virtual void create();
428 
429 private:
431 };
432 
434 
435 void
437 {
440 }
441 
442 void
444 {
445  const int64_t entryLimit = Transients::EntryLimit();
446  if (entryLimit <= 0)
447  return; // no SMP configured or a misconfiguration
448 
449  Must(!mapOwner);
450  mapOwner = TransientsMap::Init(MapLabel, entryLimit);
451 }
452 
454 {
455  delete mapOwner;
456 }
457 
#define EBIT_CLR(flag, bit)
Definition: defines.h:68
Anchor & writeableEntry(const AnchorId anchorId)
writeable anchor for the entry created by openForWriting()
Definition: StoreMap.cc:237
class Ping::pingStats_ stats
void monitorIo(StoreEntry *, const cache_key *, const Store::IoStatus)
Definition: Transients.cc:215
#define Here()
source code location of the caller
Definition: Here.h:15
initializes shared memory segment used by Transients
Definition: Transients.cc:420
IoStatus
cache "I/O" direction and status
Definition: forward.h:40
void startAppending(const sfileno fileno)
restrict opened for writing entry to appending operations; allow reads
Definition: StoreMap.cc:191
virtual void getStats(StoreInfoStats &stats) const override
collect statistics
Definition: Transients.cc:56
TransientsMap * map
shared packed info indexed by Store keys, for creating new StoreEntries
Definition: Transients.h:107
unsigned char cache_key
Store key.
Definition: forward.h:29
Locals * locals
Definition: Transients.h:112
MemObject * mem_obj
Definition: Store.h:219
bool waitingToBeFreed
whether the entry was marked for deletion
Definition: Transients.h:34
static bool SmpAware()
whether there are any SMP-aware storages
Definition: Controller.cc:916
void createMemObject()
Definition: store.cc:1553
@ KEY_PRIVATE
Definition: enums.h:102
virtual uint64_t currentSize() const override
current size
Definition: Transients.cc:112
void anchorEntry(StoreEntry &, const sfileno, const Ipc::StoreMapAnchor &)
fills (recently created) StoreEntry with information currently in Transients
Definition: Transients.cc:286
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:830
void disconnect(StoreEntry &)
the caller is done writing or reading the given entry
Definition: Transients.cc:373
Definition: SBuf.h:94
int64_t shared_transient_entries_limit
Definition: SquidConfig.h:351
Io io
current I/O state
Definition: MemObject.h:173
virtual ~TransientsRr()
Definition: Transients.cc:453
ReadWriteLock lock
protects slot data below
Definition: StoreMap.h:80
virtual uint64_t maxSize() const override
Definition: Transients.cc:105
std::atomic< uint32_t > readers
number of reading users
Definition: ReadWriteLock.h:49
virtual uint64_t currentCount() const override
the total number of objects stored right now
Definition: Transients.cc:120
static Owner * Init(const SBuf &path, const int slotLimit)
initialize shared memory
Definition: StoreMap.cc:42
A const & max(A const &lhs, A const &rhs)
bool abortedByWriter
whether the entry was aborted
Definition: Transients.h:33
shared entry metadata, used for synchronization
Definition: Transients.h:31
virtual void init() override
Definition: Transients.cc:41
double doublePercent(const double, const double)
Definition: SquidMath.cc:25
const Anchor * openForReading(const cache_key *const key, sfileno &fileno)
opens entry (identified by key) for reading, increments read level
Definition: StoreMap.cc:440
void status(const StoreEntry &e, EntryStatus &entryStatus) const
copies current shared entry metadata into entryStatus
Definition: Transients.cc:316
struct Ipc::StoreMapAnchor::Basics basics
static bool Enabled()
Can we create and initialize Transients?
Definition: Transients.h:94
#define NULL
Definition: types.h:166
const Anchor & readableEntry(const AnchorId anchorId) const
readable anchor for the entry created by openForReading()
Definition: StoreMap.cc:244
virtual ~Transients()
Definition: Transients.cc:34
int entryLimit() const
maximum entryCount() possible
Definition: StoreMap.cc:733
int32_t StoreMapSliceId
Definition: StoreMap.h:24
bool isWriter(const StoreEntry &) const
whether the entry is in "writing to Transients" I/O state
Definition: Transients.cc:413
void addWriterEntry(StoreEntry &, const cache_key *)
addEntry() helper used for cache entry creators/writers
Definition: Transients.cc:251
#define EBIT_TEST(flag, bit)
Definition: defines.h:69
void setCollapsingRequirement(const bool required)
allow or forbid collapsed requests feeding
Definition: store.cc:1975
std::vector< StoreEntry * > Locals
Definition: Transients.h:109
virtual void noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId) override
adjust slice-linked state before a locked Readable slice is erased
Definition: Transients.cc:310
@ ENTRY_REQUIRES_COLLAPSING
Definition: enums.h:118
int32_t index
entry position inside the in-transit table
Definition: MemObject.h:172
@ ioDone
Definition: forward.h:40
XitTable xitTable
current [shared] memory caching state for the entry
Definition: MemObject.h:175
virtual StoreEntry * get(const cache_key *) override
Definition: Transients.cc:146
bool hasWriter(const StoreEntry &)
whether we or somebody else is in the "writing to Transients" I/O state
Definition: Transients.cc:302
#define assert(EX)
Definition: assert.h:19
YesNoNone memShared
whether the memory cache is shared among workers
Definition: SquidConfig.h:87
size_t PageLevel()
approximate total number of shared memory pages used now
Definition: Pages.cc:80
int entryCount() const
number of writeable and readable entries
Definition: StoreMap.cc:739
void addEntry(StoreEntry *, const cache_key *, const Store::IoStatus)
creates a new Transients entry
Definition: Transients.cc:234
bool configured() const
Definition: YesNoNone.h:67
StoreEntry * findCollapsed(const sfileno xitIndex)
return a local, previously collapsed entry
Definition: Transients.cc:185
virtual void reference(StoreEntry &e) override
somebody needs this entry (many cache replacement policies need to know)
Definition: Transients.cc:133
Anchor * openForWriting(const cache_key *const key, sfileno &fileno)
Definition: StoreMap.cc:140
signed_int32_t sfileno
Definition: forward.h:22
bool freeEntry(const sfileno)
Definition: StoreMap.cc:313
virtual uint64_t minSize() const override
the minimum size the store will shrink to via normal housekeeping
Definition: Transients.cc:99
bool markedForDeletion(const cache_key *) const
Definition: Transients.cc:400
void completeWriting(const StoreEntry &e)
called when the in-transit entry has been successfully cached
Definition: Transients.cc:329
const Anchor * openOrCreateForReading(const cache_key *, sfileno &, const StoreEntry &)
openForReading() but creates a new entry if there is no old one
Definition: StoreMap.cc:103
void exportInto(StoreEntry &) const
load StoreEntry basics that were previously stored with set()
Definition: StoreMap.cc:979
bool isReader(const StoreEntry &) const
whether the entry is in "reading from Transients" I/O state
Definition: Transients.cc:407
const Anchor * peekAtWriter(const sfileno fileno) const
Definition: StoreMap.cc:297
size_t PageLimit()
the total number of shared memory pages that can be in use at any time
Definition: Pages.cc:55
virtual bool dereference(StoreEntry &e) override
Definition: Transients.cc:139
void addReaderEntry(StoreEntry &, const cache_key *)
Definition: Transients.cc:273
StoreMapCleaner * cleaner
notified before a readable entry is freed
Definition: StoreMap.h:361
an std::runtime_error with thrower location info
Definition: TextException.h:21
void disableHitValidation()
Definition: StoreMap.h:346
bool markedForDeletion(const cache_key *const)
Definition: StoreMap.cc:355
const Anchor & peekAtEntry(const sfileno fileno) const
Definition: StoreMap.cc:307
virtual void maintain() override
perform regular periodic maintenance; TODO: move to UFSSwapDir::Maintain
Definition: Transients.cc:93
#define Must(condition)
Definition: TextException.h:71
void closeForReadingAndFreeIdle(const sfileno fileno)
same as closeForReading() but also frees the entry if it is unlocked
Definition: StoreMap.cc:506
bool hasTransients() const
whether there is a corresponding locked transients table entry
Definition: Store.h:209
bool collapsed
whether the entry allows collapsing
Definition: Transients.h:35
static int64_t EntryLimit()
calculates maximum number of entries we need to store and map
Definition: Transients.cc:393
#define PRId64
Definition: types.h:110
TransientsMap::Owner * mapOwner
Definition: Transients.cc:430
void abortWriting(const sfileno fileno)
stop writing the entry, freeing its slot for others to use if possible
Definition: StoreMap.cc:251
RunnerRegistrationEntry(TransientsRr)
size_t PageSize()
returns page size in bytes; all pages are assumed to be the same size
Definition: Pages.cc:28
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
static const SBuf MapLabel("transients_map")
shared memory segment path to use for Transients map
Ipc::StoreMap TransientsMap
Definition: Transients.h:20
@ ioWriting
Definition: forward.h:40
bool UsingSmp()
Whether there should be more than one worker process running.
Definition: tools.cc:693
virtual void useConfig()
Definition: Segment.cc:377
virtual void evictCached(StoreEntry &) override
Definition: Transients.cc:348
virtual int64_t maxObjectSize() const override
the maximum size of a storable object; -1 if unlimited
Definition: Transients.cc:126
virtual void create()
called when the runner should create a new memory segment
Definition: Transients.cc:443
@ ioReading
Definition: forward.h:40
virtual void stat(StoreEntry &e) const override
Definition: Transients.cc:73
void clearCollapsingRequirement(const StoreEntry &e)
removes collapsing requirement (for future hits)
Definition: Transients.cc:201
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:196
sfileno fileNoByKey(const cache_key *const key) const
computes map entry anchor position for a given entry key
Definition: StoreMap.cc:912
aggregates anchor and slice owners for Init() caller convenience
Definition: StoreMap.h:233
High-level store statistics used by mgr:info action. Used inside PODs!
Definition: StoreStats.h:14
virtual void useConfig()
Definition: Transients.cc:436
void switchWritingToReading(const sfileno fileno)
stop writing (or updating) the locked entry and start reading it
Definition: StoreMap.cc:211
class SquidConfig Config
Definition: SquidConfig.cc:12
virtual void evictIfFound(const cache_key *) override
Definition: Transients.cc:362
int readers(const StoreEntry &e) const
number of entry readers some time ago
Definition: Transients.cc:338

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors