RockIoState.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Disk IO Routines */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "CollapsedForwarding.h"
14 #include "DiskIO/DiskIOModule.h"
15 #include "DiskIO/DiskIOStrategy.h"
16 #include "DiskIO/WriteRequest.h"
17 #include "fs/rock/RockIoRequests.h"
18 #include "fs/rock/RockIoState.h"
19 #include "fs/rock/RockSwapDir.h"
20 #include "globals.h"
21 #include "MemObject.h"
22 #include "Parsing.h"
23 #include "Transients.h"
24 
26  StoreEntry *anEntry,
27  StoreIOState::STFNCB *cbFile,
29  void *data) :
30  StoreIOState(cbFile, cbIo, data),
31  readableAnchor_(NULL),
32  writeableAnchor_(NULL),
33  splicingPoint(-1),
34  staleSplicingPointNext(-1),
35  dir(aDir),
36  slotSize(dir->slotSize),
37  objOffset(0),
38  sidCurrent(-1),
39  theBuf(dir->slotSize)
40 {
41  e = anEntry;
42  e->lock("rock I/O");
43  // anchor, swap_filen, and swap_dirn are set by the caller
44  ++store_open_disk_fd; // TODO: use a dedicated counter?
45  //theFile is set by SwapDir because it depends on DiskIOStrategy
46 }
47 
49 {
51 
52  // The dir map entry may still be open for reading at the point because
53  // the map entry lock is associated with StoreEntry, not IoState.
54  // assert(!readableAnchor_);
55  assert(shutting_down || !writeableAnchor_);
56 
57  if (callback_data)
58  cbdataReferenceDone(callback_data);
59  theFile = NULL;
60 
61  e->unlock("rock I/O");
62 }
63 
64 void
66 {
67  assert(!theFile);
68  assert(aFile != NULL);
69  theFile = aFile;
70 }
71 
72 const Ipc::StoreMapAnchor &
74 {
75  assert(readableAnchor_);
76  return *readableAnchor_;
77 }
78 
81 {
82  assert(writeableAnchor_);
83  return *writeableAnchor_;
84 }
85 
87 const Ipc::StoreMapSlice &
89 {
90  return dir->map->readableSlice(swap_filen, sidCurrent);
91 }
92 
93 void
94 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
95 {
96  debugs(79, 7, swap_filen << " reads from " << coreOff);
97 
98  assert(theFile != NULL);
99  assert(coreOff >= 0);
100 
101  // if we are dealing with the first read or
102  // if the offset went backwords, start searching from the beginning
103  if (sidCurrent < 0 || coreOff < objOffset) {
104  sidCurrent = readAnchor().start;
105  objOffset = 0;
106  }
107 
108  while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
109  objOffset += currentReadableSlice().size;
110  sidCurrent = currentReadableSlice().next;
111  }
112 
113  assert(read.callback == NULL);
114  assert(read.callback_data == NULL);
115  read.callback = cb;
116  read.callback_data = cbdataReference(data);
117 
118  // punt if read offset is too big (because of client bugs or collapsing)
119  if (sidCurrent < 0) {
120  debugs(79, 5, "no " << coreOff << " in " << *e);
121  callReaderBack(buf, 0);
122  return;
123  }
124 
125  offset_ = coreOff;
126  len = min(len,
127  static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
128  const uint64_t diskOffset = dir->diskOffset(sidCurrent);
129  theFile->read(new ReadRequest(::ReadRequest(buf,
130  diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
131 }
132 
133 void
134 Rock::IoState::callReaderBack(const char *buf, int rlen)
135 {
136  debugs(79, 5, rlen << " bytes for " << *e);
137  splicingPoint = rlen >= 0 ? sidCurrent : -1;
138  if (splicingPoint < 0)
139  staleSplicingPointNext = -1;
140  else
141  staleSplicingPointNext = currentReadableSlice().next;
142  StoreIOState::STRCB *callb = read.callback;
143  assert(callb);
144  read.callback = NULL;
145  void *cbdata;
146  if (cbdataReferenceValidDone(read.callback_data, &cbdata))
147  callb(cbdata, buf, rlen, this);
148 }
149 
151 bool
152 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
153 {
154  bool success = false;
155  try {
156  tryWrite(buf, size, coreOff);
157  success = true;
158  } catch (const std::exception &ex) { // TODO: should we catch ... as well?
159  debugs(79, 2, "db write error: " << ex.what());
160  dir->writeError(*this);
161  finishedWriting(DISK_ERROR);
162  // 'this' might be gone beyond this point; fall through to free buf
163  }
164 
165  // careful: 'this' might be gone here
166 
167  if (dtor)
168  (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
169 
170  return success;
171 }
172 
179 void
180 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
181 {
182  debugs(79, 7, swap_filen << " writes " << size << " more");
183 
184  // either this is the first write or append; we do not support write gaps
185  assert(!coreOff || coreOff == -1);
186 
187  // throw if an accepted unknown-size entry grew too big or max-size changed
188  Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
189 
190  // allocate the first slice during the first write
191  if (!coreOff) {
192  assert(sidCurrent < 0);
193  sidCurrent = reserveSlotForWriting(); // throws on failures
194  assert(sidCurrent >= 0);
195  writeAnchor().start = sidCurrent;
196  }
197 
198  // buffer incoming data in slot buffer and write overflowing or final slots
199  // quit when no data left or we stopped writing on reentrant error
200  while (size > 0 && theFile != NULL) {
201  assert(sidCurrent >= 0);
202  const size_t processed = writeToBuffer(buf, size);
203  buf += processed;
204  size -= processed;
205  const bool overflow = size > 0;
206 
207  // We do not write a full buffer without overflow because
208  // we would not yet know what to set the nextSlot to.
209  if (overflow) {
210  const SlotId sidNext = reserveSlotForWriting(); // throws
211  assert(sidNext >= 0);
212  writeToDisk(sidNext);
213  } else if (Store::Root().transientReaders(*e)) {
214  // write partial buffer for all remote hit readers to see
215  writeBufToDisk(-1, false, false);
216  }
217  }
218 
219 }
220 
223 size_t
225 {
226  // do not buffer a cell header for nothing
227  if (!size)
228  return 0;
229 
230  if (!theBuf.size) {
231  // will fill the header in writeToDisk when the next slot is known
232  theBuf.appended(sizeof(DbCellHeader));
233  }
234 
235  size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
236  theBuf.append(buf, forCurrentSlot);
237  offset_ += forCurrentSlot; // so that Core thinks we wrote it
238  return forCurrentSlot;
239 }
240 
243 void
244 Rock::IoState::writeToDisk(const SlotId sidNextProposal)
245 {
246  assert(theFile != NULL);
247  assert(theBuf.size >= sizeof(DbCellHeader));
248 
249  const bool lastWrite = sidNextProposal < 0;
250  const bool eof = lastWrite &&
251  // either not updating or the updating reader has loaded everything
252  (touchingStoreEntry() || staleSplicingPointNext < 0);
253  // approve sidNextProposal unless _updating_ the last slot
254  const SlotId sidNext = (!touchingStoreEntry() && lastWrite) ?
255  staleSplicingPointNext : sidNextProposal;
256  debugs(79, 5, "sidNext:" << sidNextProposal << "=>" << sidNext << " eof=" << eof);
257 
258  // TODO: if DiskIO module is mmap-based, we should be writing whole pages
259  // to avoid triggering read-page;new_head+old_tail;write-page overheads
260 
261  writeBufToDisk(sidNext, eof, lastWrite);
262  theBuf.clear();
263 
264  sidCurrent = sidNext;
265 }
266 
269 void
270 Rock::IoState::writeBufToDisk(const SlotId sidNext, const bool eof, const bool lastWrite)
271 {
272  // no slots after the last/eof slot (but partial slots may have a nil next)
273  assert(!eof || sidNext < 0);
274 
275  // finalize db cell header
276  DbCellHeader header;
277  memcpy(header.key, e->key, sizeof(header.key));
278  header.firstSlot = writeAnchor().start;
279  header.nextSlot = sidNext;
280  header.payloadSize = theBuf.size - sizeof(DbCellHeader);
281  header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
282  header.version = writeAnchor().basics.timestamp;
283 
284  // copy finalized db cell header into buffer
285  memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
286 
287  // and now allocate another buffer for the WriteRequest so that
288  // we can support concurrent WriteRequests (and to ease cleaning)
289  // TODO: should we limit the number of outstanding requests?
290  size_t wBufCap = 0;
291  void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
292  memcpy(wBuf, theBuf.mem, theBuf.size);
293 
294  const uint64_t diskOffset = dir->diskOffset(sidCurrent);
295  debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
296  theBuf.size);
297 
298  WriteRequest *const r = new WriteRequest(
299  ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
300  memFreeBufFunc(wBufCap)), this);
301  r->sidCurrent = sidCurrent;
302  r->sidNext = sidNext;
303  r->eof = lastWrite;
304 
305  // theFile->write may call writeCompleted immediatelly
306  theFile->write(r);
307 }
308 
312 {
313  Ipc::Mem::PageId pageId;
314  if (dir->useFreeSlot(pageId))
315  return pageId.number-1;
316 
317  // This may happen when the number of available db slots is close to the
318  // number of concurrent requests reading or writing those slots, which may
319  // happen when the db is "small" compared to the request traffic OR when we
320  // are rebuilding and have not loaded "many" entries or empty slots yet.
321  throw TexcHere("ran out of free db slots");
322 }
323 
324 void
326 {
327  // we incremented offset_ while accumulating data in write()
328  // we do not reset writeableAnchor_ here because we still keep the lock
329  if (touchingStoreEntry())
331  callBack(errFlag);
332 }
333 
334 void
336 {
337  debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
338  " buf: " << theBuf.size << " callback: " << callback);
339 
340  if (!theFile) {
341  debugs(79, 3, "I/O already canceled");
342  assert(!callback);
343  // We keep writeableAnchor_ after callBack() on I/O errors.
344  assert(!readableAnchor_);
345  return;
346  }
347 
348  switch (how) {
349  case wroteAll:
350  assert(theBuf.size > 0); // we never flush last bytes on our own
351  writeToDisk(-1); // flush last, yet unwritten slot to disk
352  return; // writeCompleted() will callBack()
353 
354  case writerGone:
355  dir->writeError(*this); // abort a partially stored entry
356  finishedWriting(DISK_ERROR);
357  return;
358 
359  case readerDone:
360  callBack(0);
361  return;
362  }
363 }
364 
368 {
369 public:
371  callback(NULL),
372  callback_data(NULL),
373  errflag(err),
374  sio(anSio) {
375 
376  callback = cb;
377  callback_data = cbdataReference(data);
378  }
379 
381  callback(NULL),
382  callback_data(NULL),
383  errflag(cb.errflag),
384  sio(cb.sio) {
385 
386  callback = cb.callback;
387  callback_data = cbdataReference(cb.callback_data);
388  }
389 
390  virtual ~StoreIOStateCb() {
391  cbdataReferenceDone(callback_data); // may be nil already
392  }
393 
394  void dial(AsyncCall &) {
395  void *cbd;
396  if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
397  callback(cbd, errflag, sio.getRaw());
398  }
399 
400  bool canDial(AsyncCall &) const {
401  return cbdataReferenceValid(callback_data) && callback;
402  }
403 
404  virtual void print(std::ostream &os) const {
405  os << '(' << callback_data << ", err=" << errflag << ')';
406  }
407 
408 private:
409  StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
410 
413  int errflag;
415 };
416 
417 void
419 {
420  debugs(79,3, HERE << "errflag=" << errflag);
421  theFile = NULL;
422 
423  AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
424  StoreIOStateCb(callback, callback_data, errflag, this));
425  ScheduleCallHere(call);
426 
427  callback = NULL;
428  cbdataReferenceDone(callback_data);
429 }
430 
virtual ~IoState()
Definition: RockIoState.cc:48
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:256
SlotId reserveSlotForWriting()
finds and returns a free db slot to fill or throws
Definition: RockIoState.cc:311
#define assert(EX)
Definition: assert.h:17
void const char HLPCB * callback
Definition: stub_helper.cc:16
#define cbdataReferenceDone(var)
Definition: cbdata.h:350
Definition: cbdata.cc:60
void STRCB(void *their_data, const char *buf, ssize_t len, StoreIOState::Pointer self)
Definition: StoreIOState.h:29
bool eof
whether this is the last request for the entry
const Ipc::StoreMapAnchor & readAnchor() const
Definition: RockIoState.cc:73
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
virtual void close(int how)
finish or abort swapping per CloseHow
Definition: RockIoState.cc:335
void writeBufToDisk(const SlotId nextSlot, const bool eof, const bool lastWrite)
Definition: RockIoState.cc:270
sfileno SlotId
db cell number, starting with cell 0 (always occupied by the db header)
Definition: forward.h:30
void FREE(void *)
Definition: forward.h:36
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
Controller & Root()
safely access controller singleton
Definition: Controller.cc:619
#define DISK_ERROR
Definition: defines.h:40
void * callback_data
Definition: RockIoState.cc:412
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: old_api.cc:320
static void Broadcast(const StoreEntry &e)
notify other workers about changes in entry state (e.g., new data)
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
int shutting_down
Definition: testAddress.cc:36
uint32_t number
page number within the segment
Definition: Page.h:35
void const char HLPCB void * data
Definition: stub_helper.cc:16
void dial(AsyncCall &)
Definition: RockIoState.cc:394
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio)
Definition: RockIoState.cc:370
IoState(Rock::SwapDir::Pointer &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
Definition: RockIoState.cc:25
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
#define cbdataReference(var)
Definition: cbdata.h:341
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
void STFNCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:41
size_t writeToBuffer(char const *buf, size_t size)
Definition: RockIoState.cc:224
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
void file(const RefCount< DiskFile > &aFile)
Definition: RockIoState.cc:65
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
void const char * buf
Definition: stub_helper.cc:16
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:147
void writeToDisk(const SlotId nextSlot)
Definition: RockIoState.cc:244
#define Must(cond)
Definition: TextException.h:89
void callBack(int errflag)
Definition: RockIoState.cc:418
SlotId sidCurrent
slot being written using this write request
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:51
virtual void print(std::ostream &os) const
Definition: RockIoState.cc:404
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
void finishedWriting(const int errFlag)
called by SwapDir::writeCompleted() after the last write and on error
Definition: RockIoState.cc:325
int store_open_disk_fd
void callReaderBack(const char *buf, int rlen)
forwards read data to the reader that initiated this I/O
Definition: RockIoState.cc:134
StoreEntry * e
Definition: StoreIOState.h:85
virtual ~StoreIOStateCb()
Definition: RockIoState.cc:390
virtual bool write(char const *buf, size_t size, off_t offset, FREE *free_func)
wraps tryWrite() to handle deep write failures centrally and safely
Definition: RockIoState.cc:152
Shared memory page identifier, address, or handler.
Definition: Page.h:21
StoreIOState::STIOCB * callback
Definition: RockIoState.cc:411
#define TexcHere(msg)
Definition: TextException.h:81
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:412
const Ipc::StoreMapSlice & currentReadableSlice() const
convenience wrapper returning the map slot we are reading now
Definition: RockIoState.cc:88
SlotId sidNext
allocated next slot (negative if we are writing the last slot)
StoreIOStateCb(const StoreIOStateCb &cb)
Definition: RockIoState.cc:380
Rock::IoState::Pointer sio
Definition: RockIoState.cc:414
virtual void read_(char *buf, size_t size, off_t offset, STRCB *callback, void *callback_data)
Definition: RockIoState.cc:94
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
bool canDial(AsyncCall &) const
Definition: RockIoState.cc:400
FREE * memFreeBufFunc(size_t size)
Definition: old_api.cc:552
void tryWrite(char const *buf, size_t size, off_t offset)
Definition: RockIoState.cc:180
#define NULL
Definition: types.h:166
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45
int size
Definition: ModDevPoll.cc:77
A const & min(A const &lhs, A const &rhs)
Ipc::StoreMapAnchor & writeAnchor()
Definition: RockIoState.cc:80
void lock(const char *context)
Definition: store.cc:448

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors