RockIoState.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Disk IO Routines */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "CollapsedForwarding.h"
14 #include "DiskIO/DiskIOModule.h"
15 #include "DiskIO/DiskIOStrategy.h"
16 #include "DiskIO/WriteRequest.h"
17 #include "fs/rock/RockIoRequests.h"
18 #include "fs/rock/RockIoState.h"
19 #include "fs/rock/RockSwapDir.h"
20 #include "globals.h"
21 #include "MemObject.h"
22 #include "Parsing.h"
23 #include "Transients.h"
24 
26  StoreEntry *anEntry,
27  StoreIOState::STFNCB *cbFile,
29  void *data) :
30  StoreIOState(cbFile, cbIo, data),
31  readableAnchor_(NULL),
32  writeableAnchor_(NULL),
33  splicingPoint(-1),
34  staleSplicingPointNext(-1),
35  dir(aDir),
36  slotSize(dir->slotSize),
37  objOffset(0),
38  sidFirst(-1),
39  sidPrevious(-1),
40  sidCurrent(-1),
41  sidNext(-1),
42  requestsSent(0),
43  repliesReceived(0),
44  theBuf(dir->slotSize)
45 {
46  e = anEntry;
47  e->lock("rock I/O");
48  // anchor, swap_filen, and swap_dirn are set by the caller
49  ++store_open_disk_fd; // TODO: use a dedicated counter?
50  //theFile is set by SwapDir because it depends on DiskIOStrategy
51 }
52 
54 {
56 
57  // The dir map entry may still be open for reading at the point because
58  // the map entry lock is associated with StoreEntry, not IoState.
59  // assert(!readableAnchor_);
60  assert(shutting_down || !writeableAnchor_);
61 
62  if (callback_data)
63  cbdataReferenceDone(callback_data);
64  theFile = NULL;
65 
66  e->unlock("rock I/O");
67 }
68 
69 void
71 {
72  assert(!theFile);
73  assert(aFile != NULL);
74  theFile = aFile;
75 }
76 
77 const Ipc::StoreMapAnchor &
79 {
80  assert(readableAnchor_);
81  return *readableAnchor_;
82 }
83 
86 {
87  assert(writeableAnchor_);
88  return *writeableAnchor_;
89 }
90 
92 const Ipc::StoreMapSlice &
94 {
95  return dir->map->readableSlice(swap_filen, sidCurrent);
96 }
97 
98 void
99 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
100 {
101  debugs(79, 7, swap_filen << " reads from " << coreOff);
102 
103  assert(theFile != NULL);
104  assert(coreOff >= 0);
105 
106  // if we are dealing with the first read or
107  // if the offset went backwords, start searching from the beginning
108  if (sidCurrent < 0 || coreOff < objOffset) {
109  // readers do not need sidFirst but set it for consistency/triage sake
110  sidCurrent = sidFirst = readAnchor().start;
111  objOffset = 0;
112  }
113 
114  while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
115  objOffset += currentReadableSlice().size;
116  sidCurrent = currentReadableSlice().next;
117  }
118 
119  assert(read.callback == NULL);
120  assert(read.callback_data == NULL);
121  read.callback = cb;
122  read.callback_data = cbdataReference(data);
123 
124  // punt if read offset is too big (because of client bugs or collapsing)
125  if (sidCurrent < 0) {
126  debugs(79, 5, "no " << coreOff << " in " << *e);
127  callReaderBack(buf, 0);
128  return;
129  }
130 
131  offset_ = coreOff;
132  len = min(len,
133  static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
134  const uint64_t diskOffset = dir->diskOffset(sidCurrent);
135  const auto start = diskOffset + sizeof(DbCellHeader) + coreOff - objOffset;
136  const auto id = ++requestsSent;
137  const auto request = new ReadRequest(::ReadRequest(buf, start, len), this, id);
138  theFile->read(request);
139 }
140 
141 void
143 {
144  if (errFlag != DISK_OK || rlen < 0) {
145  debugs(79, 3, errFlag << " failure for " << *e);
146  return callReaderBack(request.buf, -1);
147  }
148 
149  if (!expectedReply(request.id))
150  return callReaderBack(request.buf, -1);
151 
152  debugs(79, 5, '#' << request.id << " read " << rlen << " bytes at " << offset_ << " for " << *e);
153  offset_ += rlen;
154  callReaderBack(request.buf, rlen);
155 }
156 
158 void
159 Rock::IoState::callReaderBack(const char *buf, int rlen)
160 {
161  splicingPoint = rlen >= 0 ? sidCurrent : -1;
162  if (splicingPoint < 0)
163  staleSplicingPointNext = -1;
164  else
165  staleSplicingPointNext = currentReadableSlice().next;
166  StoreIOState::STRCB *callb = read.callback;
167  assert(callb);
168  read.callback = NULL;
169  void *cbdata;
170  if (cbdataReferenceValidDone(read.callback_data, &cbdata))
171  callb(cbdata, buf, rlen, this);
172 }
173 
175 bool
176 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
177 {
178  bool success = false;
179  try {
180  tryWrite(buf, size, coreOff);
181  success = true;
182  } catch (const std::exception &ex) { // TODO: should we catch ... as well?
183  debugs(79, 2, "db write error: " << ex.what());
184  dir->writeError(*this);
185  finishedWriting(DISK_ERROR);
186  // 'this' might be gone beyond this point; fall through to free buf
187  }
188 
189  // careful: 'this' might be gone here
190 
191  if (dtor)
192  (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
193 
194  return success;
195 }
196 
203 void
204 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
205 {
206  debugs(79, 7, swap_filen << " writes " << size << " more");
207 
208  // either this is the first write or append;
209  // we do not support write gaps or rewrites
210  assert(!coreOff || coreOff == -1);
211 
212  // throw if an accepted unknown-size entry grew too big or max-size changed
213  Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
214 
215  // buffer incoming data in slot buffer and write overflowing or final slots
216  // quit when no data left or we stopped writing on reentrant error
217  while (size > 0 && theFile != NULL) {
218  const size_t processed = writeToBuffer(buf, size);
219  buf += processed;
220  size -= processed;
221  const bool overflow = size > 0;
222 
223  // We do not write a full buffer without overflow because
224  // we do not want to risk writing a payload-free slot on EOF.
225  if (overflow) {
226  Must(sidNext < 0);
227  sidNext = dir->reserveSlotForWriting();
228  assert(sidNext >= 0);
229  writeToDisk();
230  Must(sidNext < 0); // short sidNext lifetime simplifies code logic
231  }
232  }
233 
234 }
235 
238 size_t
239 Rock::IoState::writeToBuffer(char const *buf, size_t size)
240 {
241  // do not buffer a cell header for nothing
242  if (!size)
243  return 0;
244 
245  if (!theBuf.size) {
246  // eventually, writeToDisk() will fill this header space
247  theBuf.appended(sizeof(DbCellHeader));
248  }
249 
250  size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
251  theBuf.append(buf, forCurrentSlot);
252  offset_ += forCurrentSlot; // so that Core thinks we wrote it
253  return forCurrentSlot;
254 }
255 
257 void
259 {
260  assert(theFile != NULL);
261  assert(theBuf.size >= sizeof(DbCellHeader));
262 
263  assert((sidFirst < 0) == (sidCurrent < 0));
264  if (sidFirst < 0) // this is the first disk write
265  sidCurrent = sidFirst = dir->reserveSlotForWriting();
266 
267  // negative sidNext means this is the last write request for this entry
268  const bool lastWrite = sidNext < 0;
269  // here, eof means that we are writing the right-most entry slot
270  const bool eof = lastWrite &&
271  // either not updating or the updating reader has loaded everything
272  (touchingStoreEntry() || staleSplicingPointNext < 0);
273  debugs(79, 5, "sidCurrent=" << sidCurrent << " sidNext=" << sidNext << " eof=" << eof);
274 
275  // TODO: if DiskIO module is mmap-based, we should be writing whole pages
276  // to avoid triggering read-page;new_head+old_tail;write-page overheads
277 
278  assert(!eof || sidNext < 0); // no slots after eof
279 
280  // finalize db cell header
281  DbCellHeader header;
282  memcpy(header.key, e->key, sizeof(header.key));
283  header.firstSlot = sidFirst;
284 
285  const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
286  assert(!lastUpdatingWrite || sidNext < 0);
287  header.nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
288 
289  header.payloadSize = theBuf.size - sizeof(DbCellHeader);
290  header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
291  header.version = writeAnchor().basics.timestamp;
292 
293  // copy finalized db cell header into buffer
294  memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
295 
296  // and now allocate another buffer for the WriteRequest so that
297  // we can support concurrent WriteRequests (and to ease cleaning)
298  // TODO: should we limit the number of outstanding requests?
299  size_t wBufCap = 0;
300  void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
301  memcpy(wBuf, theBuf.mem, theBuf.size);
302 
303  const uint64_t diskOffset = dir->diskOffset(sidCurrent);
304  debugs(79, 5, swap_filen << " at " << diskOffset << '+' <<
305  theBuf.size);
306  const auto id = ++requestsSent;
307  WriteRequest *const r = new WriteRequest(
308  ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
309  memFreeBufFunc(wBufCap)), this, id);
310  r->sidCurrent = sidCurrent;
311  r->sidPrevious = sidPrevious;
312  r->eof = lastWrite;
313 
314  sidPrevious = sidCurrent;
315  sidCurrent = sidNext; // sidNext may be cleared/negative already
316  sidNext = -1;
317 
318  theBuf.clear();
319 
320  // theFile->write may call writeCompleted immediately
321  theFile->write(r);
322 }
323 
324 bool
326 {
327  Must(requestsSent); // paranoid: we sent some requests
328  Must(receivedId); // paranoid: the request was sent by some sio
329  Must(receivedId <= requestsSent); // paranoid: within our range
330  ++repliesReceived;
331  const auto expectedId = repliesReceived;
332  if (receivedId == expectedId)
333  return true;
334 
335  debugs(79, 3, "no; expected reply #" << expectedId <<
336  ", but got #" << receivedId);
337  return false;
338 }
339 
340 void
342 {
343  if (sidCurrent >= 0) {
344  dir->noteFreeMapSlice(sidCurrent);
345  sidCurrent = -1;
346  }
347  if (sidNext >= 0) {
348  dir->noteFreeMapSlice(sidNext);
349  sidNext = -1;
350  }
351 
352  // we incremented offset_ while accumulating data in write()
353  // we do not reset writeableAnchor_ here because we still keep the lock
354  if (touchingStoreEntry())
356  callBack(errFlag);
357 }
358 
359 void
361 {
362  debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
363  " leftovers: " << theBuf.size <<
364  " after " << requestsSent << '/' << repliesReceived <<
365  " callback: " << callback);
366 
367  if (!theFile) {
368  debugs(79, 3, "I/O already canceled");
369  assert(!callback);
370  // We keep writeableAnchor_ after callBack() on I/O errors.
371  assert(!readableAnchor_);
372  return;
373  }
374 
375  switch (how) {
376  case wroteAll:
377  assert(theBuf.size > 0); // we never flush last bytes on our own
378  try {
379  writeToDisk(); // flush last, yet unwritten slot to disk
380  return; // writeCompleted() will callBack()
381  }
382  catch (...) {
383  debugs(79, 2, "db flush error: " << CurrentException);
384  // TODO: Move finishedWriting() into SwapDir::writeError().
385  dir->writeError(*this);
386  finishedWriting(DISK_ERROR);
387  }
388  return;
389 
390  case writerGone:
391  dir->writeError(*this); // abort a partially stored entry
392  finishedWriting(DISK_ERROR);
393  return;
394 
395  case readerDone:
396  callBack(0);
397  return;
398  }
399 }
400 
404 {
405 public:
406  StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
407  callback(NULL),
408  callback_data(NULL),
409  errflag(err),
410  sio(anSio) {
411 
412  callback = cb;
413  callback_data = cbdataReference(data);
414  }
415 
417  callback(NULL),
418  callback_data(NULL),
419  errflag(cb.errflag),
420  sio(cb.sio) {
421 
422  callback = cb.callback;
423  callback_data = cbdataReference(cb.callback_data);
424  }
425 
426  virtual ~StoreIOStateCb() {
427  cbdataReferenceDone(callback_data); // may be nil already
428  }
429 
430  void dial(AsyncCall &) {
431  void *cbd;
432  if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
433  callback(cbd, errflag, sio.getRaw());
434  }
435 
436  bool canDial(AsyncCall &) const {
437  return cbdataReferenceValid(callback_data) && callback;
438  }
439 
440  virtual void print(std::ostream &os) const {
441  os << '(' << callback_data << ", err=" << errflag << ')';
442  }
443 
444 private:
445  StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
446 
449  int errflag;
451 };
452 
453 void
455 {
456  debugs(79,3, "errflag=" << errflag);
457  theFile = NULL;
458 
459  AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
460  StoreIOStateCb(callback, callback_data, errflag, this));
461  ScheduleCallHere(call);
462 
463  callback = NULL;
464  cbdataReferenceDone(callback_data);
465 }
466 
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
virtual void read_(char *buf, size_t size, off_t offset, STRCB *callback, void *callback_data)
Definition: RockIoState.cc:99
virtual void close(int how)
finish or abort swapping per CloseHow
Definition: RockIoState.cc:360
void STRCB(void *their_data, const char *buf, ssize_t len, StoreIOState::Pointer self)
Definition: StoreIOState.h:29
StoreIOState::STIOCB * callback
Definition: RockIoState.cc:447
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio)
Definition: RockIoState.cc:406
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:256
void FREE(void *)
Definition: forward.h:36
FREE * memFreeBufFunc(size_t size)
Definition: minimal.cc:96
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
bool canDial(AsyncCall &) const
Definition: RockIoState.cc:436
#define ScheduleCallHere(call)
Definition: AsyncCall.h:164
void lock(const char *context)
Definition: store.cc:420
SlotId sidCurrent
slot being written using this write request
Definition: cbdata.cc:60
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:398
virtual void print(std::ostream &os) const
Definition: RockIoState.cc:440
#define cbdataReference(var)
Definition: cbdata.h:341
int store_open_disk_fd
void callReaderBack(const char *buf, int rlen)
report (already sanitized/checked) I/O results to the read initiator
Definition: RockIoState.cc:159
#define DISK_OK
Definition: defines.h:27
StoreIOStateCb(const StoreIOStateCb &cb)
Definition: RockIoState.cc:416
Rock::IoState::Pointer sio
Definition: RockIoState.cc:450
int size
Definition: ModDevPoll.cc:75
#define NULL
Definition: types.h:166
void STFNCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:41
virtual ~IoState()
Definition: RockIoState.cc:53
virtual ~StoreIOStateCb()
Definition: RockIoState.cc:426
SlotId sidPrevious
slot that will point to sidCurrent in the cache_dir map
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
IoState(Rock::SwapDir::Pointer &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
Definition: RockIoState.cc:25
Ipc::StoreMapAnchor & writeAnchor()
Definition: RockIoState.cc:85
virtual bool write(char const *buf, size_t size, off_t offset, FREE *free_func)
wraps tryWrite() to handle deep write failures centrally and safely
Definition: RockIoState.cc:176
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
const Ipc::StoreMapSlice & currentReadableSlice() const
convenience wrapper returning the map slot we are reading now
Definition: RockIoState.cc:93
#define assert(EX)
Definition: assert.h:19
void callBack(int errflag)
Definition: RockIoState.cc:454
void writeToDisk()
write what was buffered during write() calls
Definition: RockIoState.cc:258
bool expectedReply(const IoXactionId receivedId)
Definition: RockIoState.cc:325
std::ostream & CurrentException(std::ostream &os)
prints active (i.e., thrown but not yet handled) exception
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
void finishedWriting(const int errFlag)
called by SwapDir::writeCompleted() after the last write and on error
Definition: RockIoState.cc:341
bool eof
whether this is the last request for the entry
uint64_t IoXactionId
unique (within a given IoState object scope) I/O transaction identifier
Definition: forward.h:36
void file(const RefCount< DiskFile > &aFile)
Definition: RockIoState.cc:70
void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
forwards read data (or an error) to the reader that initiated this I/O
Definition: RockIoState.cc:142
void dial(AsyncCall &)
Definition: RockIoState.cc:430
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:51
#define Must(condition)
Definition: TextException.h:71
void * callback_data
Definition: RockIoState.cc:448
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:154
StoreEntry * e
Definition: StoreIOState.h:85
const Ipc::StoreMapAnchor & readAnchor() const
Definition: RockIoState.cc:78
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
int shutting_down
#define DISK_ERROR
Definition: defines.h:28
size_t writeToBuffer(char const *buf, size_t size)
Definition: RockIoState.cc:239
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
char buf[READ_BUF_SZ *2+1]
Definition: tcp-banger2.c:117
A const & min(A const &lhs, A const &rhs)
void tryWrite(char const *buf, size_t size, off_t offset)
Definition: RockIoState.cc:204
struct _request * request(char *urlin)
Definition: tcp-banger2.c:291
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:196
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors