RockHeaderUpdater.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "Debug.h"
13 #include "fs/rock/RockIoState.h"
14 #include "mime_header.h"
15 #include "Store.h"
16 #include "StoreMetaUnpacker.h"
17 
19 
21  AsyncJob("Rock::HeaderUpdater"),
22  store(aStore),
23  update(anUpdate),
24  reader(),
25  writer(),
26  bytesRead(0),
27  staleSwapHeaderSize(0),
28  staleSplicingPointNext(-1)
29 {
30  // TODO: Consider limiting the number of concurrent store updates.
31 }
32 
33 bool
35 {
36  return !reader && !writer && AsyncJob::doneAll();
37 }
38 
39 void
41 {
42  if (update.stale || update.fresh)
43  store->map->abortUpdating(update);
44 
45  if (reader) {
46  reader->close(StoreIOState::readerDone);
47  reader = nullptr;
48  }
49 
50  if (writer) {
51  writer->close(StoreIOState::writerGone);
52  // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53  // Also required to avoid IoState destructor assertions.
54  // We can do this because we closed update earlier or aborted it above.
55  dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56  writer = nullptr;
57  }
58 
60 }
61 
62 void
64 {
65  Must(update.entry);
66  Must(update.stale);
67  Must(update.fresh);
68  startReading();
69 }
70 
71 void
73 {
74  reader = store->openStoreIO(
75  *update.entry,
76  nullptr, // unused; see StoreIOState::file_callback
77  &NoteDoneReading,
78  this);
79  readMore("need swap entry metadata");
80 }
81 
82 void
84 {
85  debugs(47, 7, why);
86 
87  Must(reader);
88  const IoState &rockReader = dynamic_cast<IoState&>(*reader);
89  update.stale.splicingPoint = rockReader.splicingPoint;
90  staleSplicingPointNext = rockReader.staleSplicingPointNext;
91  debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
92  " body continues at " << staleSplicingPointNext);
93 
94  reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
95  reader = nullptr; // so that swanSong() does not try to close again
96 }
97 
98 void
99 Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
100 {
101  IoCbParams io(buf, result);
102  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
103  CallJobHere1(47, 7,
104  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
106  noteRead,
107  io);
108 }
109 
110 void
112 {
113  debugs(47, 7, result.size);
114  if (!result.size) { // EOF
115  stopReading("eof");
116  } else {
117  Must(result.size > 0);
118  bytesRead += result.size;
119  readerBuffer.rawAppendFinish(result.buf, result.size);
120  exchangeBuffer.append(readerBuffer);
121  debugs(47, 7, "accumulated " << exchangeBuffer.length());
122  }
123 
124  parseReadBytes();
125 }
126 
127 void
129 {
130  debugs(47, 7, "from " << bytesRead << " because " << why);
131  Must(reader);
132  readerBuffer.clear();
133  storeRead(reader,
134  readerBuffer.rawAppendStart(store->slotSize),
135  store->slotSize,
136  bytesRead,
137  &NoteRead,
138  this);
139 }
140 
141 void
143 {
144  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
145  CallJobHere1(47, 7,
146  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
148  noteDoneReading,
149  errflag);
150 }
151 
152 void
154 {
155  debugs(47, 5, errflag << " writer=" << writer);
156  if (!reader) {
157  Must(!errflag); // we only initiate successful closures
158  Must(writer); // otherwise we would be done() and would not be called
159  } else {
160  reader = nullptr; // we are done reading
161  Must(errflag); // any external closures ought to be errors
162  mustStop("read error");
163  }
164 }
165 
166 void
168 {
169  writer = store->createUpdateIO(
170  update,
171  nullptr, // unused; see StoreIOState::file_callback
172  &NoteDoneWriting,
173  this);
174  Must(writer);
175 
176  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
177  rockWriter.staleSplicingPointNext = staleSplicingPointNext;
178 
179  // here, prefix is swap header plus HTTP reply header (i.e., updated bytes)
180  uint64_t stalePrefixSz = 0;
181  uint64_t freshPrefixSz = 0;
182 
183  off_t offset = 0; // current writing offset (for debugging)
184 
185  const auto &mem = update.entry->mem();
186 
187  {
188  debugs(20, 7, "fresh store meta for " << *update.entry);
189  size_t freshSwapHeaderSize = 0; // set by getSerialisedMetaData() below
190 
191  // There is a circular dependency between the correct/fresh value of
192  // entry->swap_file_sz and freshSwapHeaderSize. We break that loop by
193  // serializing zero swap_file_sz, just like the regular first-time
194  // swapout code may do. De-serializing code will re-calculate it in
195  // storeRebuildParseEntry(). TODO: Stop serializing entry->swap_file_sz.
196  const auto savedEntrySwapFileSize = update.entry->swap_file_sz;
197  update.entry->swap_file_sz = 0;
198  const auto freshSwapHeader = update.entry->getSerialisedMetaData(freshSwapHeaderSize);
199  update.entry->swap_file_sz = savedEntrySwapFileSize;
200 
201  Must(freshSwapHeader);
202  writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
203  stalePrefixSz += mem.swap_hdr_sz;
204  freshPrefixSz += freshSwapHeaderSize;
205  offset += freshSwapHeaderSize;
206  xfree(freshSwapHeader);
207  }
208 
209  {
210  debugs(20, 7, "fresh HTTP header @ " << offset);
211  const auto httpHeader = mem.freshestReply().pack();
212  writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
213  const auto &staleReply = mem.baseReply();
214  Must(staleReply.hdr_sz >= 0); // for int-to-uint64_t conversion below
215  Must(staleReply.hdr_sz > 0); // already initialized
216  stalePrefixSz += staleReply.hdr_sz;
217  freshPrefixSz += httpHeader->contentSize();
218  offset += httpHeader->contentSize();
219  delete httpHeader;
220  }
221 
222  {
223  debugs(20, 7, "moved HTTP body prefix @ " << offset);
224  writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
225  offset += exchangeBuffer.length();
226  exchangeBuffer.clear();
227  }
228 
229  debugs(20, 7, "wrote " << offset <<
230  "; swap_file_sz delta: -" << stalePrefixSz << " +" << freshPrefixSz);
231 
232  // Optimistic early update OK: Our write lock blocks access to swap_file_sz.
233  auto &swap_file_sz = update.fresh.anchor->basics.swap_file_sz;
234  Must(swap_file_sz >= stalePrefixSz);
235  swap_file_sz -= stalePrefixSz;
236  swap_file_sz += freshPrefixSz;
237 
238  writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
239 }
240 
241 void
243 {
244  CallJobHere1(47, 7,
245  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
247  noteDoneWriting,
248  errflag);
249 }
250 
251 void
253 {
254  debugs(47, 5, errflag << " reader=" << reader);
255  Must(!errflag);
256  Must(!reader); // if we wrote everything, then we must have read everything
257 
258  Must(writer);
259  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
260  update.fresh.splicingPoint = rockWriter.splicingPoint;
261  debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
262  store->map->closeForUpdating(update);
263  rockWriter.writeableAnchor_ = nullptr;
264  writer = nullptr; // we are done writing
265 
266  Must(doneAll());
267 }
268 
269 void
271 {
272  if (!staleSwapHeaderSize) {
273  StoreMetaUnpacker aBuilder(
274  exchangeBuffer.rawContent(),
275  exchangeBuffer.length(),
276  &staleSwapHeaderSize);
277  // Squid assumes that metadata always fits into a single db slot
278  aBuilder.checkBuffer(); // cannot update an entry with invalid metadata
279  debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
280  Must(staleSwapHeaderSize > 0);
281  exchangeBuffer.consume(staleSwapHeaderSize);
282  }
283 
284  const size_t staleHttpHeaderSize = headersEnd(
285  exchangeBuffer.rawContent(),
286  exchangeBuffer.length());
287  debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
288  if (!staleHttpHeaderSize) {
289  readMore("need more stale HTTP reply header data");
290  return;
291  }
292 
293  exchangeBuffer.consume(staleHttpHeaderSize);
294  debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
295 
296  stopReading("read the last HTTP header slot");
297  startWriting();
298 }
299 
SlotId splicingPoint
the last db slot successfully read or written
Definition: RockIoState.h:60
SlotId staleSplicingPointNext
Definition: RockIoState.h:63
void checkBuffer()
validates buffer sanity and throws if validation fails
Definition: forward.h:28
virtual void swanSong()
Definition: AsyncJob.h:59
void noteDoneWriting(int errflag)
virtual bool doneAll() const override
whether positive goal has been reached
@ wroteAll
success: caller supplied all data it wanted to swap out
Definition: StoreIOState.h:70
@ readerDone
success or failure: either way, stop swapping in
Definition: StoreIOState.h:72
static StoreIOState::STIOCB NoteDoneReading
virtual void swanSong() override
void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB *callback, void *callback_data)
Definition: store_io.cc:79
void noteRead(const IoCbParams result)
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:97
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
void noteDoneReading(int errflag)
void stopReading(const char *why)
void readMore(const char *why)
Aggregates information required for updating entry metadata and headers.
Definition: StoreMap.h:182
#define xfree
static StoreIOState::STRCB NoteRead
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
Definition: RockIoState.h:57
HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &update)
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:73
virtual void start() override
called by AsyncStart; do not call directly
@ writerGone
failure: caller left before swapping out everything
Definition: StoreIOState.h:71
CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater)
size_t headersEnd(const char *mime, size_t l, bool &containsObsFold)
Definition: mime_header.cc:17
static StoreIOState::STIOCB NoteDoneWriting
#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1)
Definition: AsyncJobCalls.h:63

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors