RockHeaderUpdater.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2019 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "Debug.h"
13 #include "fs/rock/RockIoState.h"
14 #include "mime_header.h"
15 #include "Store.h"
16 #include "StoreMetaUnpacker.h"
17 
18 CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater);
19 
21  AsyncJob("Rock::HeaderUpdater"),
22  store(aStore),
23  update(anUpdate),
24  reader(),
25  writer(),
26  bytesRead(0),
27  staleSwapHeaderSize(0),
28  staleSplicingPointNext(-1)
29 {
30  // TODO: Consider limiting the number of concurrent store updates.
31 }
32 
33 bool
35 {
36  return !reader && !writer && AsyncJob::doneAll();
37 }
38 
39 void
41 {
42  if (update.stale || update.fresh)
43  store->map->abortUpdating(update);
44 
45  if (reader) {
47  reader = nullptr;
48  }
49 
50  if (writer) {
52  // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53  // Also required to avoid IoState destructor assertions.
54  // We can do this because we closed update earlier or aborted it above.
55  dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56  writer = nullptr;
57  }
58 
60 }
61 
62 void
64 {
65  Must(update.entry);
66  Must(update.stale);
67  Must(update.fresh);
68  startReading();
69 }
70 
71 void
73 {
74  reader = store->openStoreIO(
75  *update.entry,
76  nullptr, // unused; see StoreIOState::file_callback
78  this);
79  readMore("need swap entry metadata");
80 }
81 
82 void
84 {
85  debugs(47, 7, why);
86 
87  Must(reader);
88  const IoState &rockReader = dynamic_cast<IoState&>(*reader);
91  debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
92  " body continues at " << staleSplicingPointNext);
93 
94  reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
95  reader = nullptr; // so that swanSong() does not try to close again
96 }
97 
98 void
99 Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
100 {
101  IoCbParams io(buf, result);
102  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
103  CallJobHere1(47, 7,
104  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
106  noteRead,
107  io);
108 }
109 
110 void
112 {
113  debugs(47, 7, result.size);
114  if (!result.size) { // EOF
115  stopReading("eof");
116  } else {
117  Must(result.size > 0);
118  bytesRead += result.size;
119  readerBuffer.rawAppendFinish(result.buf, result.size);
121  debugs(47, 7, "accumulated " << exchangeBuffer.length());
122  }
123 
124  parseReadBytes();
125 }
126 
127 void
129 {
130  debugs(47, 7, "from " << bytesRead << " because " << why);
131  Must(reader);
134  readerBuffer.rawAppendStart(store->slotSize),
135  store->slotSize,
136  bytesRead,
137  &NoteRead,
138  this);
139 }
140 
141 void
143 {
144  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
145  CallJobHere1(47, 7,
146  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
149  errflag);
150 }
151 
152 void
154 {
155  debugs(47, 5, errflag << " writer=" << writer);
156  if (!reader) {
157  Must(!errflag); // we only initiate successful closures
158  Must(writer); // otherwise we would be done() and would not be called
159  } else {
160  reader = nullptr; // we are done reading
161  Must(errflag); // any external closures ought to be errors
162  mustStop("read error");
163  }
164 }
165 
166 void
168 {
169  writer = store->createUpdateIO(
170  update,
171  nullptr, // unused; see StoreIOState::file_callback
173  this);
174  Must(writer);
175 
176  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
178 
179  off_t offset = 0; // current writing offset (for debugging)
180 
181  {
182  debugs(20, 7, "fresh store meta for " << *update.entry);
183  const char *freshSwapHeader = update.entry->getSerialisedMetaData();
184  const auto freshSwapHeaderSize = update.entry->mem_obj->swap_hdr_sz;
185  Must(freshSwapHeader);
186  writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
187  offset += freshSwapHeaderSize;
188  xfree(freshSwapHeader);
189  }
190 
191  {
192  debugs(20, 7, "fresh HTTP header @ " << offset);
193  MemBuf *httpHeader = update.entry->mem_obj->getReply()->pack();
194  writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
195  offset += httpHeader->contentSize();
196  delete httpHeader;
197  }
198 
199  {
200  debugs(20, 7, "moved HTTP body prefix @ " << offset);
202  offset += exchangeBuffer.length();
204  }
205 
206  debugs(20, 7, "wrote " << offset);
207 
208  writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
209 }
210 
211 void
213 {
214  CallJobHere1(47, 7,
215  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
218  errflag);
219 }
220 
221 void
223 {
224  debugs(47, 5, errflag << " reader=" << reader);
225  Must(!errflag);
226  Must(!reader); // if we wrote everything, then we must have read everything
227 
228  Must(writer);
229  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
231  debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
232  store->map->closeForUpdating(update);
233  rockWriter.writeableAnchor_ = nullptr;
234  writer = nullptr; // we are done writing
235 
236  Must(doneAll());
237 }
238 
239 void
241 {
242  if (!staleSwapHeaderSize) {
243  StoreMetaUnpacker aBuilder(
247  // Squid assumes that metadata always fits into a single db slot
248  aBuilder.checkBuffer(); // cannot update an entry with invalid metadata
249  debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
252  }
253 
254  const size_t staleHttpHeaderSize = headersEnd(
257  debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
258  if (!staleHttpHeaderSize) {
259  readMore("need more stale HTTP reply header data");
260  return;
261  }
262 
263  exchangeBuffer.consume(staleHttpHeaderSize);
264  debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
265 
266  stopReading("read the last HTTP header slot");
267  startWriting();
268 }
269 
size_type length() const
Returns the number of bytes stored in SBuf.
Definition: SBuf.h:404
MemBuf * pack() const
Definition: HttpReply.cc:111
Aggregates information required for updating entry metadata and headers.
Definition: StoreMap.h:162
size_t swap_hdr_sz
Definition: MemObject.h:169
SlotId staleSplicingPointNext
non-updatable old HTTP body suffix start
void mustStop(const char *aReason)
Definition: AsyncJob.cc:69
char const * getSerialisedMetaData()
Definition: store.cc:1831
SBuf & append(const SBuf &S)
Definition: SBuf.cc:195
Definition: forward.h:27
HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &update)
void clear()
Definition: SBuf.cc:178
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
SBuf readerBuffer
I/O buffer for a single read operation.
SlotId splicingPoint
the last db slot successfully read or written
Definition: RockIoState.h:55
void noteRead(const IoCbParams result)
static StoreIOState::STIOCB NoteDoneWriting
virtual void start() override
called by AsyncStart; do not call directly
virtual void swanSong() override
void rawAppendFinish(const char *start, size_type actualSize)
Definition: SBuf.cc:144
Rock::SwapDir::Pointer store
cache_dir where the entry is stored
void const char HLPCB void * data
Definition: stub_helper.cc:16
void stopReading(const char *why)
uint64_t bytesRead
total entry bytes read from Store so far
char * rawAppendStart(size_type anticipatedSize)
Definition: SBuf.cc:136
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:124
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
Definition: RockIoState.h:52
success: caller supplied all data it wanted to swap out
Definition: StoreIOState.h:70
void noteDoneReading(int errflag)
virtual void swanSong()
Definition: AsyncJob.h:54
SBuf exchangeBuffer
bytes read but not yet discarded or written
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
success or failure: either way, stop swapping in
Definition: StoreIOState.h:72
virtual void close(int how)=0
finish or abort swapping per CloseHow
SlotId staleSplicingPointNext
Definition: RockIoState.h:58
#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1)
Definition: AsyncJobCalls.h:62
MemObject * mem_obj
Definition: Store.h:199
SBuf consume(size_type n=npos)
Definition: SBuf.cc:491
void noteDoneWriting(int errflag)
static StoreIOState::STRCB NoteRead
void checkBuffer()
validates buffer sanity and throws if validation fails
char * content()
start of the added data
Definition: MemBuf.h:41
void const char * buf
Definition: stub_helper.cc:16
int staleSwapHeaderSize
stored size of the stale entry metadata
Edition stale
old anchor and chain being updated
Definition: StoreMap.h:189
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:96
virtual bool write(char const *buf, size_t size, off_t offset, FREE *free_func)=0
Edition fresh
new anchor and updated chain prefix
Definition: StoreMap.h:190
Ipc::StoreMapUpdate update
Ipc::StoreMap update reservation.
Definition: MemBuf.h:23
virtual bool doneAll() const override
whether positive goal has been reached
CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater)
StoreMapSliceId splicingPoint
the last slice in the chain still containing metadata/headers
Definition: StoreMap.h:179
failure: caller left before swapping out everything
Definition: StoreIOState.h:71
size_t headersEnd(const char *mime, size_t l, bool &containsObsFold)
Definition: mime_header.cc:16
#define xfree
StoreIOState::Pointer reader
reads old headers and old data
const char * rawContent() const
Definition: SBuf.cc:519
static StoreIOState::STIOCB NoteDoneReading
StoreIOState::Pointer writer
writes new headers and old data
void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB *callback, void *callback_data)
Definition: store_io.cc:82
const HttpReplyPointer & getReply() const
Definition: MemObject.h:57
StoreEntry * entry
the store entry being updated
Definition: StoreMap.h:188
void readMore(const char *why)

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors