RockIoState.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 79 Disk IO Routines */
10
11#include "squid.h"
12#include "base/TextException.h"
13#include "CollapsedForwarding.h"
14#include "DiskIO/DiskIOModule.h"
16#include "DiskIO/WriteRequest.h"
18#include "fs/rock/RockIoState.h"
19#include "fs/rock/RockSwapDir.h"
20#include "globals.h"
21#include "MemObject.h"
22#include "Parsing.h"
23#include "Transients.h"
24
26 StoreEntry *anEntry,
29 void *data) :
30 StoreIOState(cbFile, cbIo, data),
31 readableAnchor_(nullptr),
32 writeableAnchor_(nullptr),
33 splicingPoint(-1),
34 staleSplicingPointNext(-1),
35 dir(aDir),
36 slotSize(dir->slotSize),
37 objOffset(0),
38 sidFirst(-1),
39 sidPrevious(-1),
40 sidCurrent(-1),
41 sidNext(-1),
42 requestsSent(0),
43 repliesReceived(0),
44 theBuf(dir->slotSize)
45{
46 e = anEntry;
47 e->lock("rock I/O");
48 // anchor, swap_filen, and swap_dirn are set by the caller
49 ++store_open_disk_fd; // TODO: use a dedicated counter?
50 //theFile is set by SwapDir because it depends on DiskIOStrategy
51}
52
54{
56
57 // The dir map entry may still be open for reading at the point because
58 // the map entry lock is associated with StoreEntry, not IoState.
59 // assert(!readableAnchor_);
60 assert(shutting_down || !writeableAnchor_);
61
62 if (callback_data)
63 cbdataReferenceDone(callback_data);
64 theFile = nullptr;
65
66 e->unlock("rock I/O");
67}
68
69void
71{
72 assert(!theFile);
73 assert(aFile != nullptr);
74 theFile = aFile;
75}
76
79{
80 assert(readableAnchor_);
81 return *readableAnchor_;
82}
83
86{
87 assert(writeableAnchor_);
88 return *writeableAnchor_;
89}
90
94{
95 return dir->map->readableSlice(swap_filen, sidCurrent);
96}
97
98void
99Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
100{
101 debugs(79, 7, swap_filen << " reads from " << coreOff);
102
103 assert(theFile != nullptr);
104 assert(coreOff >= 0);
105
106 bool writerLeft = readAnchor().writerHalted; // before the sidCurrent change
107
108 // if we are dealing with the first read or
109 // if the offset went backwords, start searching from the beginning
110 if (sidCurrent < 0 || coreOff < objOffset) {
111 // readers do not need sidFirst but set it for consistency/triage sake
112 sidCurrent = sidFirst = readAnchor().start;
113 objOffset = 0;
114 }
115
116 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
117 writerLeft = readAnchor().writerHalted; // before the sidCurrent change
118 objOffset += currentReadableSlice().size;
119 sidCurrent = currentReadableSlice().next;
120 }
121
122 assert(read.callback == nullptr);
123 assert(read.callback_data == nullptr);
124 read.callback = cb;
125 read.callback_data = cbdataReference(data);
126
127 // quit if we cannot read what they want, and the writer cannot add more
128 if (sidCurrent < 0 && writerLeft) {
129 debugs(79, 5, "quitting at " << coreOff << " in " << *e);
130 callReaderBack(buf, -1);
131 return;
132 }
133
134 // punt if read offset is too big (because of client bugs or collapsing)
135 if (sidCurrent < 0) {
136 debugs(79, 5, "no " << coreOff << " in " << *e);
137 callReaderBack(buf, 0);
138 return;
139 }
140
141 offset_ = coreOff;
142 len = min(len,
143 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
144 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
145 const auto start = diskOffset + sizeof(DbCellHeader) + coreOff - objOffset;
146 const auto id = ++requestsSent;
147 const auto request = new ReadRequest(::ReadRequest(buf, start, len), this, id);
148 theFile->read(request);
149}
150
151void
152Rock::IoState::handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
153{
154 if (errFlag != DISK_OK || rlen < 0) {
155 debugs(79, 3, errFlag << " failure for " << *e);
156 return callReaderBack(request.buf, -1);
157 }
158
159 if (!expectedReply(request.id))
160 return callReaderBack(request.buf, -1);
161
162 debugs(79, 5, '#' << request.id << " read " << rlen << " bytes at " << offset_ << " for " << *e);
163 offset_ += rlen;
164 callReaderBack(request.buf, rlen);
165}
166
168void
169Rock::IoState::callReaderBack(const char *buf, int rlen)
170{
171 splicingPoint = rlen >= 0 ? sidCurrent : -1;
172 if (splicingPoint < 0)
173 staleSplicingPointNext = -1;
174 else
175 staleSplicingPointNext = currentReadableSlice().next;
176 StoreIOState::STRCB *callb = read.callback;
177 assert(callb);
178 read.callback = nullptr;
179 void *cbdata;
180 if (cbdataReferenceValidDone(read.callback_data, &cbdata))
181 callb(cbdata, buf, rlen, this);
182}
183
185bool
186Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
187{
188 bool success = false;
189 try {
190 tryWrite(buf, size, coreOff);
191 success = true;
192 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
193 debugs(79, 2, "db write error: " << ex.what());
194 dir->writeError(*this);
195 finishedWriting(DISK_ERROR);
196 // 'this' might be gone beyond this point; fall through to free buf
197 }
198
199 // careful: 'this' might be gone here
200
201 if (dtor)
202 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
203
204 return success;
205}
206
213void
214Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
215{
216 debugs(79, 7, swap_filen << " writes " << size << " more");
217
218 // either this is the first write or append;
219 // we do not support write gaps or rewrites
220 assert(!coreOff || coreOff == -1);
221
222 // throw if an accepted unknown-size entry grew too big or max-size changed
223 Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
224
225 // buffer incoming data in slot buffer and write overflowing or final slots
226 // quit when no data left or we stopped writing on reentrant error
227 while (size > 0 && theFile != nullptr) {
228 const size_t processed = writeToBuffer(buf, size);
229 buf += processed;
230 size -= processed;
231 const bool overflow = size > 0;
232
233 // We do not write a full buffer without overflow because
234 // we do not want to risk writing a payload-free slot on EOF.
235 if (overflow) {
236 Must(sidNext < 0);
237 sidNext = dir->reserveSlotForWriting();
238 assert(sidNext >= 0);
239 writeToDisk();
240 Must(sidNext < 0); // short sidNext lifetime simplifies code logic
241 }
242 }
243
244}
245
248size_t
249Rock::IoState::writeToBuffer(char const *buf, size_t size)
250{
251 // do not buffer a cell header for nothing
252 if (!size)
253 return 0;
254
255 if (!theBuf.size) {
256 // eventually, writeToDisk() will fill this header space
257 theBuf.appended(sizeof(DbCellHeader));
258 }
259
260 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
261 theBuf.append(buf, forCurrentSlot);
262 offset_ += forCurrentSlot; // so that Core thinks we wrote it
263 return forCurrentSlot;
264}
265
267void
269{
270 assert(theFile != nullptr);
271 assert(theBuf.size >= sizeof(DbCellHeader));
272
273 assert((sidFirst < 0) == (sidCurrent < 0));
274 if (sidFirst < 0) // this is the first disk write
275 sidCurrent = sidFirst = dir->reserveSlotForWriting();
276
277 // negative sidNext means this is the last write request for this entry
278 const bool lastWrite = sidNext < 0;
279 // here, eof means that we are writing the right-most entry slot
280 const bool eof = lastWrite &&
281 // either not updating or the updating reader has loaded everything
282 (touchingStoreEntry() || staleSplicingPointNext < 0);
283 debugs(79, 5, "sidCurrent=" << sidCurrent << " sidNext=" << sidNext << " eof=" << eof);
284
285 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
286 // to avoid triggering read-page;new_head+old_tail;write-page overheads
287
288 assert(!eof || sidNext < 0); // no slots after eof
289
290 // finalize db cell header
291 DbCellHeader header;
292 memcpy(header.key, e->key, sizeof(header.key));
293 header.firstSlot = sidFirst;
294
295 const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
296 assert(!lastUpdatingWrite || sidNext < 0);
297 header.nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
298
299 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
300 header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
301 header.version = writeAnchor().basics.timestamp;
302
303 // copy finalized db cell header into buffer
304 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
305
306 // and now allocate another buffer for the WriteRequest so that
307 // we can support concurrent WriteRequests (and to ease cleaning)
308 // TODO: should we limit the number of outstanding requests?
309 size_t wBufCap = 0;
310 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
311 memcpy(wBuf, theBuf.mem, theBuf.size);
312
313 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
314 debugs(79, 5, swap_filen << " at " << diskOffset << '+' <<
315 theBuf.size);
316 const auto id = ++requestsSent;
317 WriteRequest *const r = new WriteRequest(
318 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
319 memFreeBufFunc(wBufCap)), this, id);
320 r->sidCurrent = sidCurrent;
321 r->sidPrevious = sidPrevious;
322 r->eof = lastWrite;
323
324 sidPrevious = sidCurrent;
325 sidCurrent = sidNext; // sidNext may be cleared/negative already
326 sidNext = -1;
327
328 theBuf.clear();
329
330 // theFile->write may call writeCompleted immediately
331 theFile->write(r);
332}
333
334bool
336{
337 Must(requestsSent); // paranoid: we sent some requests
338 Must(receivedId); // paranoid: the request was sent by some sio
339 Must(receivedId <= requestsSent); // paranoid: within our range
340 ++repliesReceived;
341 const auto expectedId = repliesReceived;
342 if (receivedId == expectedId)
343 return true;
344
345 debugs(79, 3, "no; expected reply #" << expectedId <<
346 ", but got #" << receivedId);
347 return false;
348}
349
350void
352{
353 if (sidCurrent >= 0) {
354 dir->noteFreeMapSlice(sidCurrent);
355 sidCurrent = -1;
356 }
357 if (sidNext >= 0) {
358 dir->noteFreeMapSlice(sidNext);
359 sidNext = -1;
360 }
361
362 // we incremented offset_ while accumulating data in write()
363 // we do not reset writeableAnchor_ here because we still keep the lock
364 if (touchingStoreEntry())
366 callBack(errFlag);
367}
368
369void
371{
372 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
373 " leftovers: " << theBuf.size <<
374 " after " << requestsSent << '/' << repliesReceived <<
375 " callback: " << callback);
376
377 if (!theFile) {
378 debugs(79, 3, "I/O already canceled");
379 assert(!callback);
380 // We keep writeableAnchor_ after callBack() on I/O errors.
381 assert(!readableAnchor_);
382 return;
383 }
384
385 switch (how) {
386 case wroteAll:
387 assert(theBuf.size > 0); // we never flush last bytes on our own
388 try {
389 writeToDisk(); // flush last, yet unwritten slot to disk
390 return; // writeCompleted() will callBack()
391 }
392 catch (...) {
393 debugs(79, 2, "db flush error: " << CurrentException);
394 // TODO: Move finishedWriting() into SwapDir::writeError().
395 dir->writeError(*this);
396 finishedWriting(DISK_ERROR);
397 }
398 return;
399
400 case writerGone:
401 dir->writeError(*this); // abort a partially stored entry
402 finishedWriting(DISK_ERROR);
403 return;
404
405 case readerDone:
406 callBack(0);
407 return;
408 }
409}
410
414{
415public:
416 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
417 callback(nullptr),
418 callback_data(nullptr),
419 errflag(err),
420 sio(anSio) {
421
422 callback = cb;
423 callback_data = cbdataReference(data);
424 }
425
427 callback(nullptr),
428 callback_data(nullptr),
429 errflag(cb.errflag),
430 sio(cb.sio) {
431
432 callback = cb.callback;
433 callback_data = cbdataReference(cb.callback_data);
434 }
435
436 ~StoreIOStateCb() override {
437 cbdataReferenceDone(callback_data); // may be nil already
438 }
439
440 void dial(AsyncCall &) {
441 void *cbd;
442 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
443 callback(cbd, errflag, sio.getRaw());
444 }
445
446 bool canDial(AsyncCall &) const {
447 return cbdataReferenceValid(callback_data) && callback;
448 }
449
450 void print(std::ostream &os) const override {
451 os << '(' << callback_data << ", err=" << errflag << ')';
452 }
453
454private:
455 StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
456
461};
462
463void
465{
466 debugs(79,3, "errflag=" << errflag);
467 theFile = nullptr;
468
469 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
470 StoreIOStateCb(callback, callback_data, errflag, this));
471 ScheduleCallHere(call);
472
473 callback = nullptr;
474 cbdataReferenceDone(callback_data);
475}
476
#define ScheduleCallHere(call)
Definition: AsyncCall.h:165
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:155
int size
Definition: ModDevPoll.cc:75
std::ostream & CurrentException(std::ostream &os)
prints active (i.e., thrown but not yet handled) exception
#define Must(condition)
Definition: TextException.h:71
#define assert(EX)
Definition: assert.h:17
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:265
#define cbdataReferenceDone(var)
Definition: cbdata.h:352
#define cbdataReference(var)
Definition: cbdata.h:343
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:239
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
char * buf
Definition: ReadRequest.h:24
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
bool write(char const *buf, size_t size, off_t offset, FREE *free_func) override
wraps tryWrite() to handle deep write failures centrally and safely
Definition: RockIoState.cc:186
~IoState() override
Definition: RockIoState.cc:53
const Ipc::StoreMapSlice & currentReadableSlice() const
convenience wrapper returning the map slot we are reading now
Definition: RockIoState.cc:93
void writeToDisk()
write what was buffered during write() calls
Definition: RockIoState.cc:268
const Ipc::StoreMapAnchor & readAnchor() const
Definition: RockIoState.cc:78
bool expectedReply(const IoXactionId receivedId)
Definition: RockIoState.cc:335
void callBack(int errflag)
Definition: RockIoState.cc:464
void read_(char *buf, size_t size, off_t offset, STRCB *callback, void *callback_data) override
Definition: RockIoState.cc:99
size_t writeToBuffer(char const *buf, size_t size)
Definition: RockIoState.cc:249
IoState(Rock::SwapDir::Pointer &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
Definition: RockIoState.cc:25
void file(const RefCount< DiskFile > &aFile)
Definition: RockIoState.cc:70
void callReaderBack(const char *buf, int rlen)
report (already sanitized/checked) I/O results to the read initiator
Definition: RockIoState.cc:169
void finishedWriting(const int errFlag)
called by SwapDir::writeCompleted() after the last write and on error
Definition: RockIoState.cc:351
Ipc::StoreMapAnchor & writeAnchor()
Definition: RockIoState.cc:85
void tryWrite(char const *buf, size_t size, off_t offset)
Definition: RockIoState.cc:214
void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
forwards read data (or an error) to the reader that initiated this I/O
Definition: RockIoState.cc:152
void close(int how) override
finish or abort swapping per CloseHow
Definition: RockIoState.cc:370
IoXactionId id
identifies this read transaction for the requesting IoState
SlotId sidPrevious
slot that will point to sidCurrent in the cache_dir map
bool eof
whether this is the last request for the entry
SlotId sidCurrent
slot being written using this write request
void lock(const char *context)
Definition: store.cc:419
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio)
Definition: RockIoState.cc:416
void * callback_data
Definition: RockIoState.cc:458
~StoreIOStateCb() override
Definition: RockIoState.cc:436
void dial(AsyncCall &)
Definition: RockIoState.cc:440
bool canDial(AsyncCall &) const
Definition: RockIoState.cc:446
void print(std::ostream &os) const override
Definition: RockIoState.cc:450
StoreIOStateCb(const StoreIOStateCb &cb)
Definition: RockIoState.cc:426
StoreIOState::STIOCB * callback
Definition: RockIoState.cc:457
Rock::IoState::Pointer sio
Definition: RockIoState.cc:460
void STFNCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:41
void STRCB(void *their_data, const char *buf, ssize_t len, StoreIOState::Pointer self)
Definition: StoreIOState.h:29
StoreEntry * e
Definition: StoreIOState.h:85
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:51
Definition: cbdata.cc:38
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:193
#define DISK_ERROR
Definition: defines.h:28
#define DISK_OK
Definition: defines.h:27
int store_open_disk_fd
int shutting_down
void FREE(void *)
Definition: forward.h:37
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
FREE * memFreeBufFunc(size_t size)
Definition: minimal.cc:102
uint64_t IoXactionId
unique (within a given IoState object scope) I/O transaction identifier
Definition: forward.h:36

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors