RockIoState.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 79 Disk IO Routines */
10
11#include "squid.h"
12#include "base/TextException.h"
13#include "CollapsedForwarding.h"
14#include "DiskIO/DiskIOModule.h"
16#include "DiskIO/WriteRequest.h"
18#include "fs/rock/RockIoState.h"
19#include "fs/rock/RockSwapDir.h"
20#include "globals.h"
21#include "MemObject.h"
22#include "Parsing.h"
23#include "Transients.h"
24
26 StoreEntry *anEntry,
28 void *data) :
29 StoreIOState(cbIo, data),
30 readableAnchor_(nullptr),
31 writeableAnchor_(nullptr),
32 splicingPoint(-1),
33 staleSplicingPointNext(-1),
34 dir(aDir),
35 slotSize(dir->slotSize),
36 objOffset(0),
37 sidFirst(-1),
38 sidPrevious(-1),
39 sidCurrent(-1),
40 sidNext(-1),
41 requestsSent(0),
42 repliesReceived(0),
43 theBuf(dir->slotSize)
44{
45 e = anEntry;
46 e->lock("rock I/O");
47 // anchor, swap_filen, and swap_dirn are set by the caller
48 ++store_open_disk_fd; // TODO: use a dedicated counter?
49 //theFile is set by SwapDir because it depends on DiskIOStrategy
50}
51
53{
55
56 // The dir map entry may still be open for reading at the point because
57 // the map entry lock is associated with StoreEntry, not IoState.
58 // assert(!readableAnchor_);
59 assert(shutting_down || !writeableAnchor_);
60
61 if (callback_data)
62 cbdataReferenceDone(callback_data);
63 theFile = nullptr;
64
65 e->unlock("rock I/O");
66}
67
68void
70{
71 assert(!theFile);
72 assert(aFile != nullptr);
73 theFile = aFile;
74}
75
78{
79 assert(readableAnchor_);
80 return *readableAnchor_;
81}
82
85{
86 assert(writeableAnchor_);
87 return *writeableAnchor_;
88}
89
93{
94 return dir->map->readableSlice(swap_filen, sidCurrent);
95}
96
97void
98Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
99{
100 debugs(79, 7, swap_filen << " reads from " << coreOff);
101
102 assert(theFile != nullptr);
103 assert(coreOff >= 0);
104
105 bool writerLeft = readAnchor().writerHalted; // before the sidCurrent change
106
107 // if we are dealing with the first read or
108 // if the offset went backwords, start searching from the beginning
109 if (sidCurrent < 0 || coreOff < objOffset) {
110 // readers do not need sidFirst but set it for consistency/triage sake
111 sidCurrent = sidFirst = readAnchor().start;
112 objOffset = 0;
113 }
114
115 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
116 writerLeft = readAnchor().writerHalted; // before the sidCurrent change
117 objOffset += currentReadableSlice().size;
118 sidCurrent = currentReadableSlice().next;
119 }
120
121 assert(read.callback == nullptr);
122 assert(read.callback_data == nullptr);
123 read.callback = cb;
124 read.callback_data = cbdataReference(data);
125
126 // quit if we cannot read what they want, and the writer cannot add more
127 if (sidCurrent < 0 && writerLeft) {
128 debugs(79, 5, "quitting at " << coreOff << " in " << *e);
129 callReaderBack(buf, -1);
130 return;
131 }
132
133 // punt if read offset is too big (because of client bugs or collapsing)
134 if (sidCurrent < 0) {
135 debugs(79, 5, "no " << coreOff << " in " << *e);
136 callReaderBack(buf, 0);
137 return;
138 }
139
140 offset_ = coreOff;
141 len = min(len,
142 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
143 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
144 const auto start = diskOffset + sizeof(DbCellHeader) + coreOff - objOffset;
145 const auto id = ++requestsSent;
146 const auto request = new ReadRequest(::ReadRequest(buf, start, len), this, id);
147 theFile->read(request);
148}
149
150void
151Rock::IoState::handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
152{
153 if (errFlag != DISK_OK || rlen < 0) {
154 debugs(79, 3, errFlag << " failure for " << *e);
155 return callReaderBack(request.buf, -1);
156 }
157
158 if (!expectedReply(request.id))
159 return callReaderBack(request.buf, -1);
160
161 debugs(79, 5, '#' << request.id << " read " << rlen << " bytes at " << offset_ << " for " << *e);
162 offset_ += rlen;
163 callReaderBack(request.buf, rlen);
164}
165
167void
168Rock::IoState::callReaderBack(const char *buf, int rlen)
169{
170 splicingPoint = rlen >= 0 ? sidCurrent : -1;
171 if (splicingPoint < 0)
172 staleSplicingPointNext = -1;
173 else
174 staleSplicingPointNext = currentReadableSlice().next;
175 StoreIOState::STRCB *callb = read.callback;
176 assert(callb);
177 read.callback = nullptr;
178 void *cbdata;
179 if (cbdataReferenceValidDone(read.callback_data, &cbdata))
180 callb(cbdata, buf, rlen, this);
181}
182
184bool
185Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
186{
187 bool success = false;
188 try {
189 tryWrite(buf, size, coreOff);
190 success = true;
191 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
192 debugs(79, 2, "db write error: " << ex.what());
193 dir->writeError(*this);
194 finishedWriting(DISK_ERROR);
195 // 'this' might be gone beyond this point; fall through to free buf
196 }
197
198 // careful: 'this' might be gone here
199
200 if (dtor)
201 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
202
203 return success;
204}
205
212void
213Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
214{
215 debugs(79, 7, swap_filen << " writes " << size << " more");
216
217 // either this is the first write or append;
218 // we do not support write gaps or rewrites
219 assert(!coreOff || coreOff == -1);
220
221 // throw if an accepted unknown-size entry grew too big or max-size changed
222 Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
223
224 // buffer incoming data in slot buffer and write overflowing or final slots
225 // quit when no data left or we stopped writing on reentrant error
226 while (size > 0 && theFile != nullptr) {
227 const size_t processed = writeToBuffer(buf, size);
228 buf += processed;
229 size -= processed;
230 const bool overflow = size > 0;
231
232 // We do not write a full buffer without overflow because
233 // we do not want to risk writing a payload-free slot on EOF.
234 if (overflow) {
235 Must(sidNext < 0);
236 sidNext = dir->reserveSlotForWriting();
237 assert(sidNext >= 0);
238 writeToDisk();
239 Must(sidNext < 0); // short sidNext lifetime simplifies code logic
240 }
241 }
242
243}
244
247size_t
248Rock::IoState::writeToBuffer(char const *buf, size_t size)
249{
250 // do not buffer a cell header for nothing
251 if (!size)
252 return 0;
253
254 if (!theBuf.size) {
255 // eventually, writeToDisk() will fill this header space
256 theBuf.appended(sizeof(DbCellHeader));
257 }
258
259 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
260 theBuf.append(buf, forCurrentSlot);
261 offset_ += forCurrentSlot; // so that Core thinks we wrote it
262 return forCurrentSlot;
263}
264
266void
268{
269 assert(theFile != nullptr);
270 assert(theBuf.size >= sizeof(DbCellHeader));
271
272 assert((sidFirst < 0) == (sidCurrent < 0));
273 if (sidFirst < 0) // this is the first disk write
274 sidCurrent = sidFirst = dir->reserveSlotForWriting();
275
276 // negative sidNext means this is the last write request for this entry
277 const bool lastWrite = sidNext < 0;
278 // here, eof means that we are writing the right-most entry slot
279 const bool eof = lastWrite &&
280 // either not updating or the updating reader has loaded everything
281 (touchingStoreEntry() || staleSplicingPointNext < 0);
282 debugs(79, 5, "sidCurrent=" << sidCurrent << " sidNext=" << sidNext << " eof=" << eof);
283
284 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
285 // to avoid triggering read-page;new_head+old_tail;write-page overheads
286
287 assert(!eof || sidNext < 0); // no slots after eof
288
289 // finalize db cell header
290 DbCellHeader header;
291 memcpy(header.key, e->key, sizeof(header.key));
292 header.firstSlot = sidFirst;
293
294 const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
295 assert(!lastUpdatingWrite || sidNext < 0);
296 header.nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
297
298 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
299 header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
300 header.version = writeAnchor().basics.timestamp;
301
302 // copy finalized db cell header into buffer
303 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
304
305 // and now allocate another buffer for the WriteRequest so that
306 // we can support concurrent WriteRequests (and to ease cleaning)
307 // TODO: should we limit the number of outstanding requests?
308 size_t wBufCap = 0;
309 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
310 memcpy(wBuf, theBuf.mem, theBuf.size);
311
312 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
313 debugs(79, 5, swap_filen << " at " << diskOffset << '+' <<
314 theBuf.size);
315 const auto id = ++requestsSent;
316 WriteRequest *const r = new WriteRequest(
317 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
318 memFreeBufFunc(wBufCap)), this, id);
319 r->sidCurrent = sidCurrent;
320 r->sidPrevious = sidPrevious;
321 r->eof = lastWrite;
322
323 sidPrevious = sidCurrent;
324 sidCurrent = sidNext; // sidNext may be cleared/negative already
325 sidNext = -1;
326
327 theBuf.clear();
328
329 // theFile->write may call writeCompleted immediately
330 theFile->write(r);
331}
332
333bool
335{
336 Must(requestsSent); // paranoid: we sent some requests
337 Must(receivedId); // paranoid: the request was sent by some sio
338 Must(receivedId <= requestsSent); // paranoid: within our range
339 ++repliesReceived;
340 const auto expectedId = repliesReceived;
341 if (receivedId == expectedId)
342 return true;
343
344 debugs(79, 3, "no; expected reply #" << expectedId <<
345 ", but got #" << receivedId);
346 return false;
347}
348
349void
351{
352 if (sidCurrent >= 0) {
353 dir->noteFreeMapSlice(sidCurrent);
354 sidCurrent = -1;
355 }
356 if (sidNext >= 0) {
357 dir->noteFreeMapSlice(sidNext);
358 sidNext = -1;
359 }
360
361 // we incremented offset_ while accumulating data in write()
362 // we do not reset writeableAnchor_ here because we still keep the lock
363 if (touchingStoreEntry())
365 callBack(errFlag);
366}
367
368void
370{
371 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
372 " leftovers: " << theBuf.size <<
373 " after " << requestsSent << '/' << repliesReceived <<
374 " callback: " << callback);
375
376 if (!theFile) {
377 debugs(79, 3, "I/O already canceled");
378 assert(!callback);
379 // We keep writeableAnchor_ after callBack() on I/O errors.
380 assert(!readableAnchor_);
381 return;
382 }
383
384 switch (how) {
385 case wroteAll:
386 assert(theBuf.size > 0); // we never flush last bytes on our own
387 try {
388 writeToDisk(); // flush last, yet unwritten slot to disk
389 return; // writeCompleted() will callBack()
390 }
391 catch (...) {
392 debugs(79, 2, "db flush error: " << CurrentException);
393 // TODO: Move finishedWriting() into SwapDir::writeError().
394 dir->writeError(*this);
395 finishedWriting(DISK_ERROR);
396 }
397 return;
398
399 case writerGone:
400 dir->writeError(*this); // abort a partially stored entry
401 finishedWriting(DISK_ERROR);
402 return;
403
404 case readerDone:
405 callBack(0);
406 return;
407 }
408}
409
413{
414public:
415 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
416 callback(nullptr),
417 callback_data(nullptr),
418 errflag(err),
419 sio(anSio) {
420
421 callback = cb;
422 callback_data = cbdataReference(data);
423 }
424
426 callback(nullptr),
427 callback_data(nullptr),
428 errflag(cb.errflag),
429 sio(cb.sio) {
430
431 callback = cb.callback;
432 callback_data = cbdataReference(cb.callback_data);
433 }
434
435 ~StoreIOStateCb() override {
436 cbdataReferenceDone(callback_data); // may be nil already
437 }
438
439 void dial(AsyncCall &) {
440 void *cbd;
441 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
442 callback(cbd, errflag, sio.getRaw());
443 }
444
445 bool canDial(AsyncCall &) const {
446 return cbdataReferenceValid(callback_data) && callback;
447 }
448
449 void print(std::ostream &os) const override {
450 os << '(' << callback_data << ", err=" << errflag << ')';
451 }
452
453private:
454 StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
455
460};
461
462void
464{
465 debugs(79,3, "errflag=" << errflag);
466 theFile = nullptr;
467
468 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
469 StoreIOStateCb(callback, callback_data, errflag, this));
470 ScheduleCallHere(call);
471
472 callback = nullptr;
473 cbdataReferenceDone(callback_data);
474}
475
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
int size
Definition: ModDevPoll.cc:75
std::ostream & CurrentException(std::ostream &os)
prints active (i.e., thrown but not yet handled) exception
#define Must(condition)
Definition: TextException.h:75
#define assert(EX)
Definition: assert.h:17
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:265
#define cbdataReferenceDone(var)
Definition: cbdata.h:352
#define cbdataReference(var)
Definition: cbdata.h:343
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:239
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
char * buf
Definition: ReadRequest.h:24
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
bool write(char const *buf, size_t size, off_t offset, FREE *free_func) override
wraps tryWrite() to handle deep write failures centrally and safely
Definition: RockIoState.cc:185
~IoState() override
Definition: RockIoState.cc:52
const Ipc::StoreMapSlice & currentReadableSlice() const
convenience wrapper returning the map slot we are reading now
Definition: RockIoState.cc:92
void writeToDisk()
write what was buffered during write() calls
Definition: RockIoState.cc:267
const Ipc::StoreMapAnchor & readAnchor() const
Definition: RockIoState.cc:77
bool expectedReply(const IoXactionId receivedId)
Definition: RockIoState.cc:334
void callBack(int errflag)
Definition: RockIoState.cc:463
void read_(char *buf, size_t size, off_t offset, STRCB *callback, void *callback_data) override
Definition: RockIoState.cc:98
size_t writeToBuffer(char const *buf, size_t size)
Definition: RockIoState.cc:248
void file(const RefCount< DiskFile > &aFile)
Definition: RockIoState.cc:69
void callReaderBack(const char *buf, int rlen)
report (already sanitized/checked) I/O results to the read initiator
Definition: RockIoState.cc:168
void finishedWriting(const int errFlag)
called by SwapDir::writeCompleted() after the last write and on error
Definition: RockIoState.cc:350
Ipc::StoreMapAnchor & writeAnchor()
Definition: RockIoState.cc:84
void tryWrite(char const *buf, size_t size, off_t offset)
Definition: RockIoState.cc:213
IoState(Rock::SwapDir::Pointer &, StoreEntry *, StoreIOState::STIOCB *, void *cbData)
Definition: RockIoState.cc:25
void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
forwards read data (or an error) to the reader that initiated this I/O
Definition: RockIoState.cc:151
void close(int how) override
finish or abort swapping per CloseHow
Definition: RockIoState.cc:369
IoXactionId id
identifies this read transaction for the requesting IoState
SlotId sidPrevious
slot that will point to sidCurrent in the cache_dir map
bool eof
whether this is the last request for the entry
SlotId sidCurrent
slot being written using this write request
void lock(const char *context)
Definition: store.cc:431
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio)
Definition: RockIoState.cc:415
void * callback_data
Definition: RockIoState.cc:457
~StoreIOStateCb() override
Definition: RockIoState.cc:435
void dial(AsyncCall &)
Definition: RockIoState.cc:439
bool canDial(AsyncCall &) const
Definition: RockIoState.cc:445
void print(std::ostream &os) const override
Definition: RockIoState.cc:449
StoreIOStateCb(const StoreIOStateCb &cb)
Definition: RockIoState.cc:425
StoreIOState::STIOCB * callback
Definition: RockIoState.cc:456
Rock::IoState::Pointer sio
Definition: RockIoState.cc:459
void STRCB(void *their_data, const char *buf, ssize_t len, StoreIOState::Pointer self)
Definition: StoreIOState.h:29
StoreEntry * e
Definition: StoreIOState.h:73
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:39
Definition: cbdata.cc:38
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
#define DISK_ERROR
Definition: defines.h:28
#define DISK_OK
Definition: defines.h:27
int store_open_disk_fd
int shutting_down
void FREE(void *)
Definition: forward.h:37
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
FREE * memFreeBufFunc(size_t size)
Definition: minimal.cc:102
uint64_t IoXactionId
unique (within a given IoState object scope) I/O transaction identifier
Definition: forward.h:36

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors