RockHeaderUpdater.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "Debug.h"
13 #include "fs/rock/RockIoState.h"
14 #include "mime_header.h"
15 #include "Store.h"
16 #include "StoreMetaUnpacker.h"
17 
18 CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater);
19 
21  AsyncJob("Rock::HeaderUpdater"),
22  store(aStore),
23  update(anUpdate),
24  reader(),
25  writer(),
26  bytesRead(0),
27  staleSwapHeaderSize(0),
28  staleSplicingPointNext(-1)
29 {
30  // TODO: Consider limiting the number of concurrent store updates.
31 }
32 
33 bool
35 {
36  return !reader && !writer && AsyncJob::doneAll();
37 }
38 
39 void
41 {
42  if (update.stale || update.fresh)
43  store->map->abortUpdating(update);
44 
45  if (reader) {
46  reader->close(StoreIOState::readerDone);
47  reader = nullptr;
48  }
49 
50  if (writer) {
51  writer->close(StoreIOState::writerGone);
52  // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53  // Also required to avoid IoState destructor assertions.
54  // We can do this because we closed update earlier or aborted it above.
55  dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56  writer = nullptr;
57  }
58 
60 }
61 
62 void
64 {
65  Must(update.entry);
66  Must(update.stale);
67  Must(update.fresh);
68  startReading();
69 }
70 
71 void
73 {
74  reader = store->openStoreIO(
75  *update.entry,
76  nullptr, // unused; see StoreIOState::file_callback
77  &NoteDoneReading,
78  this);
79  readMore("need swap entry metadata");
80 }
81 
82 void
84 {
85  debugs(47, 7, why);
86 
87  Must(reader);
88  const IoState &rockReader = dynamic_cast<IoState&>(*reader);
89  update.stale.splicingPoint = rockReader.splicingPoint;
90  staleSplicingPointNext = rockReader.staleSplicingPointNext;
91  debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
92  " body continues at " << staleSplicingPointNext);
93 
94  reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
95  reader = nullptr; // so that swanSong() does not try to close again
96 }
97 
98 void
99 Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
100 {
101  IoCbParams io(buf, result);
102  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
103  CallJobHere1(47, 7,
104  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
106  noteRead,
107  io);
108 }
109 
110 void
112 {
113  debugs(47, 7, result.size);
114  if (!result.size) { // EOF
115  stopReading("eof");
116  } else {
117  Must(result.size > 0);
118  bytesRead += result.size;
119  readerBuffer.rawAppendFinish(result.buf, result.size);
120  exchangeBuffer.append(readerBuffer);
121  debugs(47, 7, "accumulated " << exchangeBuffer.length());
122  }
123 
124  parseReadBytes();
125 }
126 
127 void
129 {
130  debugs(47, 7, "from " << bytesRead << " because " << why);
131  Must(reader);
132  readerBuffer.clear();
133  storeRead(reader,
134  readerBuffer.rawAppendStart(store->slotSize),
135  store->slotSize,
136  bytesRead,
137  &NoteRead,
138  this);
139 }
140 
141 void
143 {
144  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
145  CallJobHere1(47, 7,
146  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
148  noteDoneReading,
149  errflag);
150 }
151 
152 void
154 {
155  debugs(47, 5, errflag << " writer=" << writer);
156  if (!reader) {
157  Must(!errflag); // we only initiate successful closures
158  Must(writer); // otherwise we would be done() and would not be called
159  } else {
160  reader = nullptr; // we are done reading
161  Must(errflag); // any external closures ought to be errors
162  mustStop("read error");
163  }
164 }
165 
166 void
168 {
169  writer = store->createUpdateIO(
170  update,
171  nullptr, // unused; see StoreIOState::file_callback
172  &NoteDoneWriting,
173  this);
174  Must(writer);
175 
176  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
177  rockWriter.staleSplicingPointNext = staleSplicingPointNext;
178 
179  off_t offset = 0; // current writing offset (for debugging)
180 
181  {
182  debugs(20, 7, "fresh store meta for " << *update.entry);
183  const char *freshSwapHeader = update.entry->getSerialisedMetaData();
184  const auto freshSwapHeaderSize = update.entry->mem_obj->swap_hdr_sz;
185  Must(freshSwapHeader);
186  writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
187  offset += freshSwapHeaderSize;
188  xfree(freshSwapHeader);
189  }
190 
191  {
192  debugs(20, 7, "fresh HTTP header @ " << offset);
193  MemBuf *httpHeader = update.entry->mem_obj->getReply()->pack();
194  writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
195  offset += httpHeader->contentSize();
196  delete httpHeader;
197  }
198 
199  {
200  debugs(20, 7, "moved HTTP body prefix @ " << offset);
201  writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
202  offset += exchangeBuffer.length();
203  exchangeBuffer.clear();
204  }
205 
206  debugs(20, 7, "wrote " << offset);
207 
208  writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
209 }
210 
211 void
213 {
214  CallJobHere1(47, 7,
215  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
217  noteDoneWriting,
218  errflag);
219 }
220 
221 void
223 {
224  debugs(47, 5, errflag << " reader=" << reader);
225  Must(!errflag);
226  Must(!reader); // if we wrote everything, then we must have read everything
227 
228  Must(writer);
229  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
230  update.fresh.splicingPoint = rockWriter.splicingPoint;
231  debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
232  store->map->closeForUpdating(update);
233  rockWriter.writeableAnchor_ = nullptr;
234  writer = nullptr; // we are done writing
235 
236  Must(doneAll());
237 }
238 
239 void
241 {
242  if (!staleSwapHeaderSize) {
243  StoreMetaUnpacker aBuilder(
244  exchangeBuffer.rawContent(),
245  exchangeBuffer.length(),
246  &staleSwapHeaderSize);
247  // Squid assumes that metadata always fits into a single db slot
248  aBuilder.checkBuffer(); // cannot update an entry with invalid metadata
249  debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
250  Must(staleSwapHeaderSize > 0);
251  exchangeBuffer.consume(staleSwapHeaderSize);
252  }
253 
254  const size_t staleHttpHeaderSize = headersEnd(
255  exchangeBuffer.rawContent(),
256  exchangeBuffer.length());
257  debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
258  if (!staleHttpHeaderSize) {
259  readMore("need more stale HTTP reply header data");
260  return;
261  }
262 
263  exchangeBuffer.consume(staleHttpHeaderSize);
264  debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
265 
266  stopReading("read the last HTTP header slot");
267  startWriting();
268 }
269 
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:96
Aggregates information required for updating entry metadata and headers.
Definition: StoreMap.h:146
HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &update)
SlotId splicingPoint
the last db slot successfully read or written
Definition: RockIoState.h:55
void noteRead(const IoCbParams result)
static StoreIOState::STIOCB NoteDoneWriting
virtual void start() override
called by AsyncStart; do not call directly
virtual void swanSong() override
void stopReading(const char *why)
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
Definition: RockIoState.h:52
success: caller supplied all data it wanted to swap out
Definition: StoreIOState.h:70
void noteDoneReading(int errflag)
virtual void swanSong()
Definition: AsyncJob.h:55
success or failure: either way, stop swapping in
Definition: StoreIOState.h:72
SlotId staleSplicingPointNext
Definition: RockIoState.h:58
#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1)
Definition: AsyncJobCalls.h:62
void noteDoneWriting(int errflag)
static StoreIOState::STRCB NoteRead
void checkBuffer()
validates buffer sanity and throws if validation fails
char * content()
start of the added data
Definition: MemBuf.h:41
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
#define Must(cond)
Definition: TextException.h:89
Definition: MemBuf.h:23
virtual bool doneAll() const override
whether positive goal has been reached
CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater)
failure: caller left before swapping out everything
Definition: StoreIOState.h:71
size_t headersEnd(const char *mime, size_t l, bool &containsObsFold)
Definition: mime_header.cc:16
#define xfree
static StoreIOState::STIOCB NoteDoneReading
void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB *callback, void *callback_data)
Definition: store_io.cc:82
void readMore(const char *why)

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors