27 staleSwapHeaderSize(0),
28 staleSplicingPointNext(-1)
42 if (update.stale || update.fresh)
43 store->map->abortUpdating(update);
74 reader = store->openStoreIO(
79 readMore(
"need swap entry metadata");
91 debugs(47, 5,
"stale chain ends at " << update.stale.splicingPoint <<
92 " body continues at " << staleSplicingPointNext);
101 IoCbParams io(buf, result);
118 bytesRead += result.
size;
119 readerBuffer.rawAppendFinish(result.
buf, result.
size);
120 exchangeBuffer.append(readerBuffer);
121 debugs(47, 7,
"accumulated " << exchangeBuffer.length());
130 debugs(47, 7,
"from " << bytesRead <<
" because " << why);
132 readerBuffer.clear();
134 readerBuffer.rawAppendStart(store->slotSize),
155 debugs(47, 5, errflag <<
" writer=" << writer);
162 mustStop(
"read error");
169 writer = store->createUpdateIO(
180 uint64_t stalePrefixSz = 0;
181 uint64_t freshPrefixSz = 0;
185 const auto &mem = update.entry->mem();
188 debugs(20, 7,
"fresh store meta for " << *update.entry);
189 size_t freshSwapHeaderSize = 0;
196 const auto savedEntrySwapFileSize = update.entry->swap_file_sz;
197 update.entry->swap_file_sz = 0;
198 const auto freshSwapHeader = update.entry->getSerialisedMetaData(freshSwapHeaderSize);
199 update.entry->swap_file_sz = savedEntrySwapFileSize;
201 Must(freshSwapHeader);
202 writer->write(freshSwapHeader, freshSwapHeaderSize, 0,
nullptr);
203 stalePrefixSz += mem.swap_hdr_sz;
204 freshPrefixSz += freshSwapHeaderSize;
205 offset += freshSwapHeaderSize;
206 xfree(freshSwapHeader);
210 debugs(20, 7,
"fresh HTTP header @ " << offset);
211 const auto httpHeader = mem.freshestReply().pack();
212 writer->write(httpHeader->content(), httpHeader->contentSize(), -1,
nullptr);
213 const auto &staleReply = mem.baseReply();
214 Must(staleReply.hdr_sz >= 0);
215 Must(staleReply.hdr_sz > 0);
216 stalePrefixSz += staleReply.hdr_sz;
217 freshPrefixSz += httpHeader->contentSize();
218 offset += httpHeader->contentSize();
223 debugs(20, 7,
"moved HTTP body prefix @ " << offset);
224 writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1,
nullptr);
225 offset += exchangeBuffer.length();
226 exchangeBuffer.clear();
229 debugs(20, 7,
"wrote " << offset <<
230 "; swap_file_sz delta: -" << stalePrefixSz <<
" +" << freshPrefixSz);
233 auto &swap_file_sz = update.fresh.anchor->basics.swap_file_sz;
234 Must(swap_file_sz >= stalePrefixSz);
235 swap_file_sz -= stalePrefixSz;
236 swap_file_sz += freshPrefixSz;
254 debugs(47, 5, errflag <<
" reader=" << reader);
261 debugs(47, 5,
"fresh chain ends at " << update.fresh.splicingPoint);
262 store->map->closeForUpdating(update);
272 if (!staleSwapHeaderSize) {
275 debugs(47, 7,
"staleSwapHeaderSize=" << staleSwapHeaderSize);
276 Must(staleSwapHeaderSize > 0);
277 exchangeBuffer.consume(staleSwapHeaderSize);
280 const size_t staleHttpHeaderSize =
headersEnd(
281 exchangeBuffer.rawContent(),
282 exchangeBuffer.length());
283 debugs(47, 7,
"staleHttpHeaderSize=" << staleHttpHeaderSize);
284 if (!staleHttpHeaderSize) {
285 readMore(
"need more stale HTTP reply header data");
289 exchangeBuffer.consume(staleHttpHeaderSize);
290 debugs(47, 7,
"httpBodySizePrefix=" << exchangeBuffer.length());
292 stopReading(
"read the last HTTP header slot");
#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1)
virtual bool doneAll() const
whether positive goal has been reached
Aggregates information required for updating entry metadata and headers.
SlotId splicingPoint
the last db slot successfully read or written
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
SlotId staleSplicingPointNext
@ wroteAll
success: caller supplied all data it wanted to swap out
@ readerDone
success or failure: either way, stop swapping in
@ writerGone
failure: caller left before swapping out everything
#define debugs(SECTION, LEVEL, CONTENT)
size_t UnpackSwapMetaSize(const SBuf &)
void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB *callback, void *callback_data)