RockHeaderUpdater.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9#include "squid.h"
10#include "base/AsyncJobCalls.h"
11#include "debug/Stream.h"
13#include "fs/rock/RockIoState.h"
14#include "mime_header.h"
15#include "Store.h"
16#include "store/SwapMetaIn.h"
17
19
21 AsyncJob("Rock::HeaderUpdater"),
22 store(aStore),
23 update(anUpdate),
24 reader(),
25 writer(),
26 bytesRead(0),
27 staleSwapHeaderSize(0),
28 staleSplicingPointNext(-1)
29{
30 // TODO: Consider limiting the number of concurrent store updates.
31}
32
33bool
35{
36 return !reader && !writer && AsyncJob::doneAll();
37}
38
39void
41{
42 if (update.stale || update.fresh)
43 store->map->abortUpdating(update);
44
45 if (reader) {
46 reader->close(StoreIOState::readerDone);
47 reader = nullptr;
48 }
49
50 if (writer) {
51 writer->close(StoreIOState::writerGone);
52 // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53 // Also required to avoid IoState destructor assertions.
54 // We can do this because we closed update earlier or aborted it above.
55 dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56 writer = nullptr;
57 }
58
60}
61
62void
64{
65 Must(update.entry);
66 Must(update.stale);
67 Must(update.fresh);
68 startReading();
69}
70
71void
73{
74 reader = store->openStoreIO(
75 *update.entry,
76 &NoteDoneReading,
77 this);
78 readMore("need swap entry metadata");
79}
80
81void
83{
84 debugs(47, 7, why);
85
86 Must(reader);
87 const IoState &rockReader = dynamic_cast<IoState&>(*reader);
88 update.stale.splicingPoint = rockReader.splicingPoint;
89 staleSplicingPointNext = rockReader.staleSplicingPointNext;
90 debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
91 " body continues at " << staleSplicingPointNext);
92
93 reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
94 reader = nullptr; // so that swanSong() does not try to close again
95}
96
97void
98Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
99{
100 IoCbParams io(buf, result);
101 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
102 CallJobHere1(47, 7,
103 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
105 noteRead,
106 io);
107}
108
109void
111{
112 debugs(47, 7, result.size);
113 if (!result.size) { // EOF
114 stopReading("eof");
115 } else {
116 Must(result.size > 0);
117 bytesRead += result.size;
118 readerBuffer.rawAppendFinish(result.buf, result.size);
119 exchangeBuffer.append(readerBuffer);
120 debugs(47, 7, "accumulated " << exchangeBuffer.length());
121 }
122
123 parseReadBytes();
124}
125
126void
128{
129 debugs(47, 7, "from " << bytesRead << " because " << why);
130 Must(reader);
131 readerBuffer.clear();
132 storeRead(reader,
133 readerBuffer.rawAppendStart(store->slotSize),
134 store->slotSize,
135 bytesRead,
136 &NoteRead,
137 this);
138}
139
140void
142{
143 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
144 CallJobHere1(47, 7,
145 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
147 noteDoneReading,
148 errflag);
149}
150
151void
153{
154 debugs(47, 5, errflag << " writer=" << writer);
155 if (!reader) {
156 Must(!errflag); // we only initiate successful closures
157 Must(writer); // otherwise we would be done() and would not be called
158 } else {
159 reader = nullptr; // we are done reading
160 Must(errflag); // any external closures ought to be errors
161 mustStop("read error");
162 }
163}
164
165void
167{
168 writer = store->createUpdateIO(
169 update,
170 &NoteDoneWriting,
171 this);
172 Must(writer);
173
174 IoState &rockWriter = dynamic_cast<IoState&>(*writer);
175 rockWriter.staleSplicingPointNext = staleSplicingPointNext;
176
177 // here, prefix is swap header plus HTTP reply header (i.e., updated bytes)
178 uint64_t stalePrefixSz = 0;
179 uint64_t freshPrefixSz = 0;
180
181 off_t offset = 0; // current writing offset (for debugging)
182
183 const auto &mem = update.entry->mem();
184
185 {
186 debugs(20, 7, "fresh store meta for " << *update.entry);
187 size_t freshSwapHeaderSize = 0; // set by getSerialisedMetaData() below
188
189 // There is a circular dependency between the correct/fresh value of
190 // entry->swap_file_sz and freshSwapHeaderSize. We break that loop by
191 // serializing zero swap_file_sz, just like the regular first-time
192 // swapout code may do. De-serializing code will re-calculate it in
193 // storeRebuildParseEntry(). TODO: Stop serializing entry->swap_file_sz.
194 const auto savedEntrySwapFileSize = update.entry->swap_file_sz;
195 update.entry->swap_file_sz = 0;
196 const auto freshSwapHeader = update.entry->getSerialisedMetaData(freshSwapHeaderSize);
197 update.entry->swap_file_sz = savedEntrySwapFileSize;
198
199 Must(freshSwapHeader);
200 writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
201 stalePrefixSz += mem.swap_hdr_sz;
202 freshPrefixSz += freshSwapHeaderSize;
203 offset += freshSwapHeaderSize;
204 xfree(freshSwapHeader);
205 }
206
207 {
208 debugs(20, 7, "fresh HTTP header @ " << offset);
209 const auto httpHeader = mem.freshestReply().pack();
210 writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
211 const auto &staleReply = mem.baseReply();
212 Must(staleReply.hdr_sz >= 0); // for int-to-uint64_t conversion below
213 Must(staleReply.hdr_sz > 0); // already initialized
214 stalePrefixSz += staleReply.hdr_sz;
215 freshPrefixSz += httpHeader->contentSize();
216 offset += httpHeader->contentSize();
217 delete httpHeader;
218 }
219
220 {
221 debugs(20, 7, "moved HTTP body prefix @ " << offset);
222 writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
223 offset += exchangeBuffer.length();
224 exchangeBuffer.clear();
225 }
226
227 debugs(20, 7, "wrote " << offset <<
228 "; swap_file_sz delta: -" << stalePrefixSz << " +" << freshPrefixSz);
229
230 // Optimistic early update OK: Our write lock blocks access to swap_file_sz.
231 auto &swap_file_sz = update.fresh.anchor->basics.swap_file_sz;
232 Must(swap_file_sz >= stalePrefixSz);
233 swap_file_sz -= stalePrefixSz;
234 swap_file_sz += freshPrefixSz;
235
236 writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
237}
238
239void
241{
242 CallJobHere1(47, 7,
243 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
245 noteDoneWriting,
246 errflag);
247}
248
249void
251{
252 debugs(47, 5, errflag << " reader=" << reader);
253 Must(!errflag);
254 Must(!reader); // if we wrote everything, then we must have read everything
255
256 Must(writer);
257 IoState &rockWriter = dynamic_cast<IoState&>(*writer);
258 update.fresh.splicingPoint = rockWriter.splicingPoint;
259 debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
260 store->map->closeForUpdating(update);
261 rockWriter.writeableAnchor_ = nullptr;
262 writer = nullptr; // we are done writing
263
264 Must(doneAll());
265}
266
267void
269{
270 if (!staleSwapHeaderSize) {
271 staleSwapHeaderSize = Store::UnpackSwapMetaSize(exchangeBuffer);
272 // Squid assumes that metadata always fits into a single db slot
273 debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
274 Must(staleSwapHeaderSize > 0);
275 exchangeBuffer.consume(staleSwapHeaderSize);
276 }
277
278 const size_t staleHttpHeaderSize = headersEnd(
279 exchangeBuffer.rawContent(),
280 exchangeBuffer.length());
281 debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
282 if (!staleHttpHeaderSize) {
283 readMore("need more stale HTTP reply header data");
284 return;
285 }
286
287 exchangeBuffer.consume(staleHttpHeaderSize);
288 debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
289
290 stopReading("read the last HTTP header slot");
291 startWriting();
292}
293
#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1)
Definition: AsyncJobCalls.h:63
CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater)
#define Must(condition)
Definition: TextException.h:75
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
virtual void swanSong()
Definition: AsyncJob.h:61
Aggregates information required for updating entry metadata and headers.
Definition: StoreMap.h:182
HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &update)
void swanSong() override
void stopReading(const char *why)
static StoreIOState::STRCB NoteRead
void noteDoneReading(int errflag)
bool doneAll() const override
whether positive goal has been reached
void readMore(const char *why)
static StoreIOState::STIOCB NoteDoneReading
static StoreIOState::STIOCB NoteDoneWriting
void noteRead(const IoCbParams result)
void start() override
called by AsyncStart; do not call directly
void noteDoneWriting(int errflag)
SlotId splicingPoint
the last db slot successfully read or written
Definition: RockIoState.h:60
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
Definition: RockIoState.h:57
SlotId staleSplicingPointNext
Definition: RockIoState.h:63
@ wroteAll
success: caller supplied all data it wanted to swap out
Definition: StoreIOState.h:58
@ readerDone
success or failure: either way, stop swapping in
Definition: StoreIOState.h:60
@ writerGone
failure: caller left before swapping out everything
Definition: StoreIOState.h:59
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
size_t headersEnd(const char *mime, size_t l, bool &containsObsFold)
Definition: mime_header.cc:17
Definition: forward.h:28
size_t UnpackSwapMetaSize(const SBuf &)
Definition: SwapMetaIn.cc:237
#define xfree
void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB *callback, void *callback_data)
Definition: store_io.cc:79

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors