BodyPipe.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9#include "squid.h"
10#include "base/AsyncJobCalls.h"
11#include "base/TextException.h"
12#include "BodyPipe.h"
13
14// BodySink is a BodyConsumer class which just consume and drops
15// data from a BodyPipe
17{
19
20public:
21 BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
22 ~BodySink() override { assert(!body_pipe); }
23
25 size_t contentSize = bp->buf().contentSize();
26 bp->consume(contentSize);
27 }
30 }
33 }
34 bool doneAll() const override {return !body_pipe && AsyncJob::doneAll();}
35
36private:
38};
39
41
42// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
43// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
44// the BodyPipe passed as argument
45class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
46{
47public:
49
52 Parent(aProducer, aHandler, bp) {}
53
54 bool canDial(AsyncCall &call) override;
55};
56
57// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
58// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
59// of the BodyPipe passed as argument
60class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
61{
62public:
64
67 Parent(aConsumer, aHandler, bp) {}
68
69 bool canDial(AsyncCall &call) override;
70};
71
72bool
74{
75 if (!Parent::canDial(call))
76 return false;
77
78 const BodyProducer::Pointer &producer = job;
79 BodyPipe::Pointer aPipe = arg1;
80 if (!aPipe->stillProducing(producer)) {
81 debugs(call.debugSection, call.debugLevel, producer << " no longer producing for " << aPipe->status());
82 return call.cancel("no longer producing");
83 }
84
85 return true;
86}
87
88bool
90{
91 if (!Parent::canDial(call))
92 return false;
93
94 const BodyConsumer::Pointer &consumer = job;
95 BodyPipe::Pointer aPipe = arg1;
96 if (!aPipe->stillConsuming(consumer)) {
97 debugs(call.debugSection, call.debugLevel, consumer << " no longer consuming from " << aPipe->status());
98 return call.cancel("no longer consuming");
99 }
100
101 return true;
102}
103
104/* BodyProducer */
105
106// inform the pipe that we are done and clear the Pointer
108{
109 debugs(91,7, this << " will not produce for " << p << "; atEof: " << atEof);
110 assert(p != nullptr); // be strict: the caller state may depend on this
111 p->clearProducer(atEof);
112 p = nullptr;
113}
114
115/* BodyConsumer */
116
117// inform the pipe that we are done and clear the Pointer
119{
120 debugs(91,7, this << " will not consume from " << p);
121 assert(p != nullptr); // be strict: the caller state may depend on this
122 p->clearConsumer();
123 p = nullptr;
124}
125
126/* BodyPipe */
127
128BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
129 theProducer(aProducer), theConsumer(nullptr),
130 thePutSize(0), theGetSize(0),
131 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
132{
133 // TODO: teach MemBuf to start with zero minSize
134 // TODO: limit maxSize by theBodySize, when known?
135 theBuf.init(2*1024, MaxCapacity);
136 debugs(91,7, "created BodyPipe" << status());
137}
138
140{
141 debugs(91,7, "destroying BodyPipe" << status());
144 theBuf.clean();
145}
146
147void BodyPipe::setBodySize(uint64_t aBodySize)
148{
150 assert(thePutSize <= aBodySize);
151
152 // If this assert fails, we need to add code to check for eof and inform
153 // the consumer about the eof condition via scheduleBodyEndNotification,
154 // because just setting a body size limit may trigger the eof condition.
156
157 theBodySize = aBodySize;
158 debugs(91,7, "set body size" << status());
159}
160
161uint64_t BodyPipe::bodySize() const
162{
164 return static_cast<uint64_t>(theBodySize);
165}
166
167bool BodyPipe::expectMoreAfter(uint64_t offset) const
168{
169 assert(theGetSize <= offset);
170 return offset < thePutSize || // buffer has more now or
171 (!productionEnded() && mayNeedMoreData()); // buffer will have more
172}
173
175{
177}
178
180{
181 return bodySize() - thePutSize; // bodySize() asserts that size is known
182}
183
185{
186 const uint64_t expectedSize = thePutSize + size;
187 if (bodySizeKnown())
188 Must(bodySize() == expectedSize);
189 else
190 theBodySize = expectedSize;
191}
192
193void
195{
196 if (theProducer.set()) {
197 debugs(91,7, "clearing BodyPipe producer" << status());
199 if (atEof) {
200 if (!bodySizeKnown())
202 else if (bodySize() != thePutSize)
203 debugs(91,3, "aborting on premature eof" << status());
204 } else {
205 // asserta that we can detect the abort if the consumer joins later
207 }
209 }
210}
211
212size_t
213BodyPipe::putMoreData(const char *aBuffer, size_t size)
214{
215 if (bodySizeKnown())
216 size = min((uint64_t)size, unproducedSize());
217
218 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
219 if ((size = min(size, spaceSize))) {
220 theBuf.append(aBuffer, size);
222 return size;
223 }
224 return 0;
225}
226
227bool
229{
231 assert(aConsumer.set()); // but might be invalid
232
233 // TODO: convert this into an exception and remove IfNotLate suffix
234 // If there is something consumed already, we are in an auto-consuming mode
235 // and it is too late to attach a real consumer to the pipe.
236 if (theGetSize > 0) {
238 return false;
239 }
240
241 Must(!abortedConsumption); // did not promise to never consume
242
243 theConsumer = aConsumer;
244 debugs(91,7, "set consumer" << status());
245 if (theBuf.hasContent())
247 if (!theProducer)
249
250 return true;
251}
252
253void
255{
256 if (theConsumer.set()) {
257 debugs(91,7, "clearing consumer" << status());
259 // do not abort if we have not consumed so that HTTP or ICAP can retry
260 // benign xaction failures due to persistent connection race conditions
261 if (consumedSize())
263 }
264}
265
266void
268{
269 // We may be called multiple times because multiple jobs on the consumption
270 // chain may realize that there will be no more setConsumer() calls (e.g.,
271 // consuming code and retrying code). It is both difficult and not really
272 // necessary for them to coordinate their expectNoConsumption() calls.
273
274 // As a consequence, we may be called when we are auto-consuming already.
275
276 if (!abortedConsumption && !exhausted()) {
277 // Before we abort, any regular consumption should be over and auto
278 // consumption must not be started.
280
281 AsyncCall::Pointer call= asyncCall(91, 7,
282 "BodyProducer::noteBodyConsumerAborted",
285 ScheduleCallHere(call);
286 abortedConsumption = true;
287
288 // in case somebody enabled auto-consumption before regular one aborted
290 }
291}
292
293size_t
295{
296 if (!theBuf.hasContent())
297 return 0; // did not touch the possibly uninitialized buf
298
299 if (aMemBuffer.isNull())
300 aMemBuffer.init();
301 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
302 aMemBuffer.append(theBuf.content(), size);
305 return size; // cannot be zero if we called buf.init above
306}
307
308void
310{
313}
314
315void
317{
318 mustAutoConsume = true;
319 debugs(91,5, "enabled auto consumption" << status());
321}
322
326void
328{
329 const auto startNow =
330 mustAutoConsume && // was enabled
331 !theConsumer && // has not started yet
332 theProducer.valid() && // still useful (and will eventually stop)
333 theBuf.hasContent(); // has something to consume right now
334 if (!startNow)
335 return;
336
337 theConsumer = new BodySink(this);
339 debugs(91,7, "starting auto consumption" << status());
341}
342
343MemBuf &
345{
347 isCheckedOut = true;
348 return theBuf;
349}
350
351void
353{
355 isCheckedOut = false;
356 const size_t currentSize = theBuf.contentSize();
357 if (checkout.checkedOutSize > currentSize)
358 postConsume(checkout.checkedOutSize - currentSize);
359 else if (checkout.checkedOutSize < currentSize)
360 postAppend(currentSize - checkout.checkedOutSize);
361}
362
363void
365{
367 const size_t currentSize = theBuf.contentSize();
368 // We can only undo if size did not change, and even that carries
369 // some risk. If this becomes a problem, the code checking out
370 // raw buffers should always check them in (possibly unchanged)
371 // instead of relying on the automated undo mechanism of Checkout.
372 // The code can always use a temporary buffer to accomplish that.
373 Must(checkout.checkedOutSize == currentSize);
374}
375
376// TODO: Optimize: inform consumer/producer about more data/space only if
377// they used the data/space since we notified them last time.
378
379void
381{
383 theGetSize += size;
384 debugs(91,7, "consumed " << size << " bytes" << status());
385 if (mayNeedMoreData()) {
386 AsyncCall::Pointer call= asyncCall(91, 7,
387 "BodyProducer::noteMoreBodySpaceAvailable",
390 ScheduleCallHere(call);
391 }
392}
393
394void
396{
398 thePutSize += size;
399 debugs(91,7, "added " << size << " bytes" << status());
400
401 // We should not consume here even if mustAutoConsume because the
402 // caller may not be ready for the data to be consumed during this call.
404
405 // Do this check after scheduleBodyDataNotification() to ensure the
406 // natural order of "more body data" and "production ended" events.
407 if (!mayNeedMoreData())
408 clearProducer(true); // reached end-of-body
409
411}
412
413void
415{
416 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
417 AsyncCall::Pointer call = asyncCall(91, 7,
418 "BodyConsumer::noteMoreBodyDataAvailable",
421 ScheduleCallHere(call);
422 }
423}
424
425void
427{
428 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
429 if (bodySizeKnown() && bodySize() == thePutSize) {
430 AsyncCall::Pointer call = asyncCall(91, 7,
431 "BodyConsumer::noteBodyProductionEnded",
434 ScheduleCallHere(call);
435 } else {
436 AsyncCall::Pointer call = asyncCall(91, 7,
437 "BodyConsumer::noteBodyProducerAborted",
440 ScheduleCallHere(call);
441 }
442 }
443}
444
445// a short temporary string describing buffer status for debugging
446const char *BodyPipe::status() const
447{
448 static MemBuf outputBuffer;
449 outputBuffer.reset();
450
451 outputBuffer.append(" [", 2);
452
453 outputBuffer.appendf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
454 if (theBodySize >= 0)
455 outputBuffer.appendf("<=%" PRId64, theBodySize);
456 else
457 outputBuffer.append("<=?", 3);
458
459 outputBuffer.appendf(" %" PRId64 "+%" PRId64, static_cast<int64_t>(theBuf.contentSize()), static_cast<int64_t>(theBuf.spaceSize()));
460
461 outputBuffer.appendf(" pipe%p", this);
462 if (theProducer.set())
463 outputBuffer.appendf(" prod%p", theProducer.get());
464 if (theConsumer.set())
465 outputBuffer.appendf(" cons%p", theConsumer.get());
466
467 if (mustAutoConsume)
468 outputBuffer.append(" A", 2);
470 outputBuffer.append(" !C", 3);
471 if (isCheckedOut)
472 outputBuffer.append(" L", 2); // Locked
473
474 outputBuffer.append("]", 1);
475
476 outputBuffer.terminate();
477
478 return outputBuffer.content();
479}
480
481/* BodyPipeCheckout */
482
484 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
485 checkedOutSize(buf.contentSize()), checkedIn(false)
486{
487}
488
490{
491 if (!checkedIn) {
492 // Do not pipe.undoCheckOut(*this) because it asserts or throws
493 // TODO: consider implementing the long-term solution discussed at
494 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
495 debugs(91,2, "Warning: cannot undo BodyPipeCheckout");
496 thePipe.checkIn(*this);
497 }
498}
499
500void
502{
504 thePipe.checkIn(*this);
505 checkedIn = true;
506}
507
509 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
510 checkedIn(c.checkedIn)
511{
512 assert(false); // prevent copying
513}
514
517{
518 assert(false); // prevent assignment
519 return *this;
520}
521
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
int size
Definition: ModDevPoll.cc:75
#define Must(condition)
Definition: TextException.h:75
#define assert(EX)
Definition: assert.h:17
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:320
const int debugLevel
Definition: AsyncCall.h:77
bool cancel(const char *reason)
Definition: AsyncCall.cc:56
const int debugSection
Definition: AsyncCall.h:76
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
UnaryMemFunT< BodyConsumer, BodyPipe::Pointer > Parent
Definition: BodyPipe.cc:63
bool canDial(AsyncCall &call) override
Definition: BodyPipe.cc:89
BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer, Parent::Method aHandler, BodyPipe::Pointer bp)
Definition: BodyPipe.cc:65
virtual void noteBodyProductionEnded(RefCount< BodyPipe > bp)=0
void stopConsumingFrom(RefCount< BodyPipe > &)
Definition: BodyPipe.cc:118
virtual void noteMoreBodyDataAvailable(RefCount< BodyPipe > bp)=0
virtual void noteBodyProducerAborted(RefCount< BodyPipe > bp)=0
const size_t checkedOutSize
Definition: BodyPipe.h:78
BodyPipe & thePipe
Definition: BodyPipe.h:73
BodyPipeCheckout & operator=(const BodyPipeCheckout &)
Definition: BodyPipe.cc:516
BodyPipeCheckout(BodyPipe &)
Definition: BodyPipe.cc:483
bool stillConsuming(const Consumer::Pointer &consumer) const
Definition: BodyPipe.h:132
void scheduleBodyEndNotification()
Definition: BodyPipe.cc:426
void checkIn(Checkout &checkout)
Definition: BodyPipe.cc:352
void scheduleBodyDataNotification()
Definition: BodyPipe.cc:414
~BodyPipe() override
Definition: BodyPipe.cc:139
void expectNoConsumption()
there will be no more setConsumer() calls
Definition: BodyPipe.cc:267
size_t putMoreData(const char *buf, size_t size)
Definition: BodyPipe.cc:213
size_t getMoreData(MemBuf &buf)
Definition: BodyPipe.cc:294
const MemBuf & buf() const
Definition: BodyPipe.h:137
uint64_t thePutSize
Definition: BodyPipe.h:161
bool isCheckedOut
Definition: BodyPipe.h:168
void clearConsumer()
Definition: BodyPipe.cc:254
void clearProducer(bool atEof)
Definition: BodyPipe.cc:194
MemBuf & checkOut()
Definition: BodyPipe.cc:344
void postConsume(size_t size)
Definition: BodyPipe.cc:380
bool exhausted() const
Definition: BodyPipe.cc:174
bool bodySizeKnown() const
Definition: BodyPipe.h:109
bool mustAutoConsume
keep theBuf empty when producing without consumer
Definition: BodyPipe.h:166
bool abortedConsumption
called BodyProducer::noteBodyConsumerAborted
Definition: BodyPipe.h:167
uint64_t unproducedSize() const
Definition: BodyPipe.cc:179
void setBodySize(uint64_t aSize)
Definition: BodyPipe.cc:147
void consume(size_t size)
Definition: BodyPipe.cc:309
uint64_t consumedSize() const
Definition: BodyPipe.h:111
Producer::Pointer theProducer
Definition: BodyPipe.h:158
bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
Definition: BodyPipe.cc:228
uint64_t theGetSize
Definition: BodyPipe.h:162
void expectProductionEndAfter(uint64_t extraSize)
sets or checks body size
Definition: BodyPipe.cc:184
const char * status() const
Definition: BodyPipe.cc:446
void postAppend(size_t size)
Definition: BodyPipe.cc:395
bool stillProducing(const Producer::Pointer &producer) const
Definition: BodyPipe.h:121
void startAutoConsumptionIfNeeded()
Definition: BodyPipe.cc:327
MemBuf theBuf
Definition: BodyPipe.h:164
bool productionEnded() const
Definition: BodyPipe.h:113
bool mayNeedMoreData() const
Definition: BodyPipe.h:118
uint64_t bodySize() const
Definition: BodyPipe.cc:161
void enableAutoConsumption()
start or continue consuming when producing without consumer
Definition: BodyPipe.cc:316
@ MaxCapacity
Definition: BodyPipe.h:100
BodyPipe(Producer *aProducer)
Definition: BodyPipe.cc:128
int64_t theBodySize
Definition: BodyPipe.h:157
bool expectMoreAfter(uint64_t offset) const
Definition: BodyPipe.cc:167
void undoCheckOut(Checkout &checkout)
Definition: BodyPipe.cc:364
Consumer::Pointer theConsumer
Definition: BodyPipe.h:159
UnaryMemFunT< BodyProducer, BodyPipe::Pointer > Parent
Definition: BodyPipe.cc:48
bool canDial(AsyncCall &call) override
Definition: BodyPipe.cc:73
BodyProducerDialer(const BodyProducer::Pointer &aProducer, Parent::Method aHandler, BodyPipe::Pointer bp)
Definition: BodyPipe.cc:50
virtual void noteMoreBodySpaceAvailable(RefCount< BodyPipe > bp)=0
virtual void noteBodyConsumerAborted(RefCount< BodyPipe > bp)=0
void stopProducingFor(RefCount< BodyPipe > &, bool atEof)
Definition: BodyPipe.cc:107
CBDATA_CHILD(BodySink)
void noteBodyProductionEnded(BodyPipe::Pointer) override
Definition: BodyPipe.cc:28
~BodySink() override
Definition: BodyPipe.cc:22
bool doneAll() const override
whether positive goal has been reached
Definition: BodyPipe.cc:34
void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) override
Definition: BodyPipe.cc:24
BodySink(const BodyPipe::Pointer &bp)
Definition: BodyPipe.cc:21
void noteBodyProducerAborted(BodyPipe::Pointer) override
Definition: BodyPipe.cc:31
BodyPipe::Pointer body_pipe
the pipe we are consuming from
Definition: BodyPipe.cc:37
Cbc * valid() const
was set and is valid
Definition: CbcPointer.h:41
void clear()
make pointer not set; does not invalidate cbdata
Definition: CbcPointer.h:144
Cbc * get() const
a temporary valid raw Cbc pointer or NULL
Definition: CbcPointer.h:159
bool set() const
was set but may be invalid
Definition: CbcPointer.h:40
virtual bool canDial(AsyncCall &call)
Definition: MemBuf.h:24
mb_size_t spaceSize() const
Definition: MemBuf.cc:155
void clean()
Definition: MemBuf.cc:110
void append(const char *c, int sz) override
Definition: MemBuf.cc:209
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:93
char * content()
start of the added data
Definition: MemBuf.h:41
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
mb_size_t potentialSpaceSize() const
Definition: MemBuf.cc:161
void consume(mb_size_t sz)
removes sz bytes and "packs" by moving content left
Definition: MemBuf.cc:168
int isNull() const
Definition: MemBuf.cc:145
void reset()
Definition: MemBuf.cc:129
bool hasContent() const
Definition: MemBuf.h:54
void terminate()
Definition: MemBuf.cc:241
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
void(Job::* Method)(BodyPipe::Pointer)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
#define PRIu64
Definition: types.h:114
#define PRId64
Definition: types.h:104

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors