BodyPipe.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "base/TextException.h"
12 #include "BodyPipe.h"
13 
14 // BodySink is a BodyConsumer class which just consume and drops
15 // data from a BodyPipe
16 class BodySink: public BodyConsumer
17 {
19 
20 public:
21  BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
22  virtual ~BodySink() { assert(!body_pipe); }
23 
25  size_t contentSize = bp->buf().contentSize();
26  bp->consume(contentSize);
27  }
30  }
33  }
34  bool doneAll() const {return !body_pipe && AsyncJob::doneAll();}
35 
36 private:
38 };
39 
41 
42 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
43 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
44 // the BodyPipe passed as argument
45 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
46 {
47 public:
49 
51  Parent::Method aHandler, BodyPipe::Pointer bp):
52  Parent(aProducer, aHandler, bp) {}
53 
54  virtual bool canDial(AsyncCall &call);
55 };
56 
57 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
58 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
59 // of the BodyPipe passed as argument
60 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
61 {
62 public:
64 
66  Parent::Method aHandler, BodyPipe::Pointer bp):
67  Parent(aConsumer, aHandler, bp) {}
68 
69  virtual bool canDial(AsyncCall &call);
70 };
71 
72 bool
74 {
75  if (!Parent::canDial(call))
76  return false;
77 
78  const BodyProducer::Pointer &producer = job;
79  BodyPipe::Pointer aPipe = arg1;
80  if (!aPipe->stillProducing(producer)) {
81  debugs(call.debugSection, call.debugLevel, producer << " no longer producing for " << aPipe->status());
82  return call.cancel("no longer producing");
83  }
84 
85  return true;
86 }
87 
88 bool
90 {
91  if (!Parent::canDial(call))
92  return false;
93 
94  const BodyConsumer::Pointer &consumer = job;
95  BodyPipe::Pointer aPipe = arg1;
96  if (!aPipe->stillConsuming(consumer)) {
97  debugs(call.debugSection, call.debugLevel, consumer << " no longer consuming from " << aPipe->status());
98  return call.cancel("no longer consuming");
99  }
100 
101  return true;
102 }
103 
104 /* BodyProducer */
105 
106 // inform the pipe that we are done and clear the Pointer
108 {
109  debugs(91,7, this << " will not produce for " << p << "; atEof: " << atEof);
110  assert(p != NULL); // be strict: the caller state may depend on this
111  p->clearProducer(atEof);
112  p = NULL;
113 }
114 
115 /* BodyConsumer */
116 
117 // inform the pipe that we are done and clear the Pointer
119 {
120  debugs(91,7, this << " will not consume from " << p);
121  assert(p != NULL); // be strict: the caller state may depend on this
122  p->clearConsumer();
123  p = NULL;
124 }
125 
126 /* BodyPipe */
127 
128 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
129  theProducer(aProducer), theConsumer(0),
130  thePutSize(0), theGetSize(0),
131  mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
132 {
133  // TODO: teach MemBuf to start with zero minSize
134  // TODO: limit maxSize by theBodySize, when known?
135  theBuf.init(2*1024, MaxCapacity);
136  debugs(91,7, HERE << "created BodyPipe" << status());
137 }
138 
140 {
141  debugs(91,7, HERE << "destroying BodyPipe" << status());
144  theBuf.clean();
145 }
146 
147 void BodyPipe::setBodySize(uint64_t aBodySize)
148 {
149  assert(!bodySizeKnown());
150  assert(thePutSize <= aBodySize);
151 
152  // If this assert fails, we need to add code to check for eof and inform
153  // the consumer about the eof condition via scheduleBodyEndNotification,
154  // because just setting a body size limit may trigger the eof condition.
156 
157  theBodySize = aBodySize;
158  debugs(91,7, HERE << "set body size" << status());
159 }
160 
161 uint64_t BodyPipe::bodySize() const
162 {
164  return static_cast<uint64_t>(theBodySize);
165 }
166 
167 bool BodyPipe::expectMoreAfter(uint64_t offset) const
168 {
169  assert(theGetSize <= offset);
170  return offset < thePutSize || // buffer has more now or
171  (!productionEnded() && mayNeedMoreData()); // buffer will have more
172 }
173 
175 {
176  return !expectMoreAfter(theGetSize);
177 }
178 
179 uint64_t BodyPipe::unproducedSize() const
180 {
181  return bodySize() - thePutSize; // bodySize() asserts that size is known
182 }
183 
185 {
186  const uint64_t expectedSize = thePutSize + size;
187  if (bodySizeKnown())
188  Must(bodySize() == expectedSize);
189  else
190  theBodySize = expectedSize;
191 }
192 
193 void
195 {
196  if (theProducer.set()) {
197  debugs(91,7, HERE << "clearing BodyPipe producer" << status());
198  theProducer.clear();
199  if (atEof) {
200  if (!bodySizeKnown())
202  else if (bodySize() != thePutSize)
203  debugs(91,3, HERE << "aborting on premature eof" << status());
204  } else {
205  // asserta that we can detect the abort if the consumer joins later
207  }
209  }
210 }
211 
212 size_t
213 BodyPipe::putMoreData(const char *aBuffer, size_t size)
214 {
215  if (bodySizeKnown())
216  size = min((uint64_t)size, unproducedSize());
217 
218  const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
219  if ((size = min(size, spaceSize))) {
220  theBuf.append(aBuffer, size);
221  postAppend(size);
222  return size;
223  }
224  return 0;
225 }
226 
227 bool
229 {
231  assert(aConsumer.set()); // but might be invalid
232 
233  // TODO: convert this into an exception and remove IfNotLate suffix
234  // If there is something consumed already, we are in an auto-consuming mode
235  // and it is too late to attach a real consumer to the pipe.
236  if (theGetSize > 0) {
238  return false;
239  }
240 
241  Must(!abortedConsumption); // did not promise to never consume
242 
243  theConsumer = aConsumer;
244  debugs(91,7, HERE << "set consumer" << status());
245  if (theBuf.hasContent())
247  if (!theProducer)
249 
250  return true;
251 }
252 
253 void
255 {
256  if (theConsumer.set()) {
257  debugs(91,7, HERE << "clearing consumer" << status());
258  theConsumer.clear();
259  // do not abort if we have not consumed so that HTTP or ICAP can retry
260  // benign xaction failures due to persistent connection race conditions
261  if (consumedSize())
263  }
264 }
265 
266 void
268 {
269  // We may be called multiple times because multiple jobs on the consumption
270  // chain may realize that there will be no more setConsumer() calls (e.g.,
271  // consuming code and retrying code). It is both difficult and not really
272  // necessary for them to coordinate their expectNoConsumption() calls.
273 
274  // As a consequence, we may be called when we are auto-consuming already.
275 
276  if (!abortedConsumption && !exhausted()) {
277  // Before we abort, any regular consumption should be over and auto
278  // consumption must not be started.
279  Must(!theConsumer);
280 
281  AsyncCall::Pointer call= asyncCall(91, 7,
282  "BodyProducer::noteBodyConsumerAborted",
285  ScheduleCallHere(call);
286  abortedConsumption = true;
287 
288  // in case somebody enabled auto-consumption before regular one aborted
290  }
291 }
292 
293 size_t
295 {
296  if (!theBuf.hasContent())
297  return 0; // did not touch the possibly uninitialized buf
298 
299  if (aMemBuffer.isNull())
300  aMemBuffer.init();
301  const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
302  aMemBuffer.append(theBuf.content(), size);
303  theBuf.consume(size);
304  postConsume(size);
305  return size; // cannot be zero if we called buf.init above
306 }
307 
308 void
310 {
311  theBuf.consume(size);
312  postConsume(size);
313 }
314 
315 void
317 {
318  mustAutoConsume = true;
319  debugs(91,5, HERE << "enabled auto consumption" << status());
321 }
322 
326 void
328 {
329  const auto startNow =
330  mustAutoConsume && // was enabled
331  !theConsumer && // has not started yet
332  theProducer.valid() && // still useful (and will eventually stop)
333  theBuf.hasContent(); // has something to consume right now
334  if (!startNow)
335  return;
336 
337  theConsumer = new BodySink(this);
338  debugs(91,7, HERE << "starting auto consumption" << status());
340 }
341 
342 MemBuf &
344 {
346  isCheckedOut = true;
347  return theBuf;
348 }
349 
350 void
352 {
354  isCheckedOut = false;
355  const size_t currentSize = theBuf.contentSize();
356  if (checkout.checkedOutSize > currentSize)
357  postConsume(checkout.checkedOutSize - currentSize);
358  else if (checkout.checkedOutSize < currentSize)
359  postAppend(currentSize - checkout.checkedOutSize);
360 }
361 
362 void
364 {
366  const size_t currentSize = theBuf.contentSize();
367  // We can only undo if size did not change, and even that carries
368  // some risk. If this becomes a problem, the code checking out
369  // raw buffers should always check them in (possibly unchanged)
370  // instead of relying on the automated undo mechanism of Checkout.
371  // The code can always use a temporary buffer to accomplish that.
372  Must(checkout.checkedOutSize == currentSize);
373 }
374 
375 // TODO: Optimize: inform consumer/producer about more data/space only if
376 // they used the data/space since we notified them last time.
377 
378 void
380 {
382  theGetSize += size;
383  debugs(91,7, HERE << "consumed " << size << " bytes" << status());
384  if (mayNeedMoreData()) {
385  AsyncCall::Pointer call= asyncCall(91, 7,
386  "BodyProducer::noteMoreBodySpaceAvailable",
389  ScheduleCallHere(call);
390  }
391 }
392 
393 void
395 {
397  thePutSize += size;
398  debugs(91,7, HERE << "added " << size << " bytes" << status());
399 
400  if (!mayNeedMoreData())
401  clearProducer(true); // reached end-of-body
402 
403  // We should not consume here even if mustAutoConsume because the
404  // caller may not be ready for the data to be consumed during this call.
406 
408 }
409 
410 void
412 {
413  if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
414  AsyncCall::Pointer call = asyncCall(91, 7,
415  "BodyConsumer::noteMoreBodyDataAvailable",
418  ScheduleCallHere(call);
419  }
420 }
421 
422 void
424 {
425  if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
426  if (bodySizeKnown() && bodySize() == thePutSize) {
427  AsyncCall::Pointer call = asyncCall(91, 7,
428  "BodyConsumer::noteBodyProductionEnded",
431  ScheduleCallHere(call);
432  } else {
433  AsyncCall::Pointer call = asyncCall(91, 7,
434  "BodyConsumer::noteBodyProducerAborted",
437  ScheduleCallHere(call);
438  }
439  }
440 }
441 
442 // a short temporary string describing buffer status for debugging
443 const char *BodyPipe::status() const
444 {
445  static MemBuf outputBuffer;
446  outputBuffer.reset();
447 
448  outputBuffer.append(" [", 2);
449 
450  outputBuffer.appendf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
451  if (theBodySize >= 0)
452  outputBuffer.appendf("<=%" PRId64, theBodySize);
453  else
454  outputBuffer.append("<=?", 3);
455 
456  outputBuffer.appendf(" %" PRId64 "+%" PRId64, static_cast<int64_t>(theBuf.contentSize()), static_cast<int64_t>(theBuf.spaceSize()));
457 
458  outputBuffer.appendf(" pipe%p", this);
459  if (theProducer.set())
460  outputBuffer.appendf(" prod%p", theProducer.get());
461  if (theConsumer.set())
462  outputBuffer.appendf(" cons%p", theConsumer.get());
463 
464  if (mustAutoConsume)
465  outputBuffer.append(" A", 2);
466  if (abortedConsumption)
467  outputBuffer.append(" !C", 3);
468  if (isCheckedOut)
469  outputBuffer.append(" L", 2); // Locked
470 
471  outputBuffer.append("]", 1);
472 
473  outputBuffer.terminate();
474 
475  return outputBuffer.content();
476 }
477 
478 /* BodyPipeCheckout */
479 
481  buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
482  checkedOutSize(buf.contentSize()), checkedIn(false)
483 {
484 }
485 
487 {
488  if (!checkedIn) {
489  // Do not pipe.undoCheckOut(*this) because it asserts or throws
490  // TODO: consider implementing the long-term solution discussed at
491  // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
492  debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
493  thePipe.checkIn(*this);
494  }
495 }
496 
497 void
499 {
500  assert(!checkedIn);
501  thePipe.checkIn(*this);
502  checkedIn = true;
503 }
504 
508 {
509  assert(false); // prevent copying
510 }
511 
514 {
515  assert(false); // prevent assignment
516  return *this;
517 }
518 
const char * status() const
Definition: BodyPipe.cc:443
const MemBuf & buf() const
Definition: BodyPipe.h:137
~BodyPipe()
Definition: BodyPipe.cc:139
bool abortedConsumption
called BodyProducer::noteBodyConsumerAborted
Definition: BodyPipe.h:167
bool stillConsuming(const Consumer::Pointer &consumer) const
Definition: BodyPipe.h:132
uint64_t thePutSize
Definition: BodyPipe.h:161
void expectProductionEndAfter(uint64_t extraSize)
sets or checks body size
Definition: BodyPipe.cc:184
void postConsume(size_t size)
Definition: BodyPipe.cc:379
#define assert(EX)
Definition: assert.h:17
BodyPipe & thePipe
Definition: BodyPipe.h:73
void enableAutoConsumption()
start or continue consuming when producing without consumer
Definition: BodyPipe.cc:316
virtual void append(const char *c, int sz)
Definition: MemBuf.cc:216
const int debugSection
Definition: AsyncCall.h:77
virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
Definition: BodyPipe.cc:24
void scheduleBodyDataNotification()
Definition: BodyPipe.cc:411
void consume(mb_size_t sz)
removes sz bytes and "packs" by moving content left
Definition: MemBuf.cc:171
const int debugLevel
Definition: AsyncCall.h:78
#define PRId64
Definition: types.h:110
void undoCheckOut(Checkout &checkout)
Definition: BodyPipe.cc:363
virtual void noteBodyProductionEnded(BodyPipe::Pointer)
Definition: BodyPipe.cc:28
virtual ~BodySink()
Definition: BodyPipe.cc:22
uint64_t theGetSize
Definition: BodyPipe.h:162
bool bodySizeKnown() const
Definition: BodyPipe.h:109
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
bool cancel(const char *reason)
Definition: AsyncCall.cc:52
bool expectMoreAfter(uint64_t offset) const
Definition: BodyPipe.cc:167
char * p
Definition: membanger.c:43
void setBodySize(uint64_t aSize)
Definition: BodyPipe.cc:147
void expectNoConsumption()
there will be no more setConsumer() calls
Definition: BodyPipe.cc:267
uint64_t unproducedSize() const
Definition: BodyPipe.cc:179
uint64_t consumedSize() const
Definition: BodyPipe.h:111
bool mustAutoConsume
keep theBuf empty when producing without consumer
Definition: BodyPipe.h:166
MemBuf & checkOut()
Definition: BodyPipe.cc:343
size_t putMoreData(const char *buf, size_t size)
Definition: BodyPipe.cc:213
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:96
virtual void noteMoreBodySpaceAvailable(RefCount< BodyPipe > bp)=0
void checkIn(Checkout &checkout)
Definition: BodyPipe.cc:351
void scheduleBodyEndNotification()
Definition: BodyPipe.cc:423
mb_size_t potentialSpaceSize() const
Definition: MemBuf.cc:164
const size_t checkedOutSize
Definition: BodyPipe.h:78
BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer, Parent::Method aHandler, BodyPipe::Pointer bp)
Definition: BodyPipe.cc:65
void clearProducer(bool atEof)
Definition: BodyPipe.cc:194
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:124
UnaryMemFunT< BodyProducer, BodyPipe::Pointer > Parent
Definition: BodyPipe.cc:48
bool isCheckedOut
Definition: BodyPipe.h:168
Cbc * valid() const
was set and is valid
Definition: CbcPointer.h:41
BodyProducerDialer(const BodyProducer::Pointer &aProducer, Parent::Method aHandler, BodyPipe::Pointer bp)
Definition: BodyPipe.cc:50
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
void reset()
Definition: MemBuf.cc:132
void startAutoConsumptionIfNeeded()
Definition: BodyPipe.cc:327
BodyPipe::Pointer body_pipe
the pipe we are consuming from
Definition: BodyPipe.cc:37
#define CBDATA_CLASS(type)
Definition: cbdata.h:302
void clear()
make pointer not set; does not invalidate cbdata
Definition: CbcPointer.h:144
virtual bool canDial(AsyncCall &call)
Definition: BodyPipe.cc:73
bool doneAll() const
whether positive goal has been reached
Definition: BodyPipe.cc:34
Producer::Pointer theProducer
Definition: BodyPipe.h:158
bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
Definition: BodyPipe.cc:228
virtual void noteBodyProducerAborted(BodyPipe::Pointer)
Definition: BodyPipe.cc:31
bool stillProducing(const Producer::Pointer &producer) const
Definition: BodyPipe.h:121
int isNull() const
Definition: MemBuf.cc:148
char * content()
start of the added data
Definition: MemBuf.h:41
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:153
size_t getMoreData(MemBuf &buf)
Definition: BodyPipe.cc:294
void clean()
Definition: MemBuf.cc:113
#define PRIu64
Definition: types.h:120
virtual void noteBodyProducerAborted(RefCount< BodyPipe > bp)=0
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
mb_size_t spaceSize() const
Definition: MemBuf.cc:158
Cbc * get() const
a temporary valid raw Cbc pointer or NULL
Definition: CbcPointer.h:162
void stopProducingFor(RefCount< BodyPipe > &, bool atEof)
Definition: BodyPipe.cc:107
const uint64_t offset
Definition: BodyPipe.h:75
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:96
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:318
MemBuf theBuf
Definition: BodyPipe.h:164
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
Definition: MemBuf.h:23
void stopConsumingFrom(RefCount< BodyPipe > &)
Definition: BodyPipe.cc:118
int64_t theBodySize
Definition: BodyPipe.h:157
virtual void noteBodyProductionEnded(RefCount< BodyPipe > bp)=0
virtual bool canDial(AsyncCall &call)
Definition: BodyPipe.cc:89
bool mayNeedMoreData() const
Definition: BodyPipe.h:118
virtual void noteBodyConsumerAborted(RefCount< BodyPipe > bp)=0
uint64_t bodySize() const
Definition: BodyPipe.cc:161
void consume(size_t size)
Definition: BodyPipe.cc:309
UnaryMemFunT< BodyConsumer, BodyPipe::Pointer > Parent
Definition: BodyPipe.cc:63
BodyPipeCheckout & operator=(const BodyPipeCheckout &)
Definition: BodyPipe.cc:513
bool exhausted() const
Definition: BodyPipe.cc:174
bool set() const
was set but may be invalid
Definition: CbcPointer.h:40
virtual void noteMoreBodyDataAvailable(RefCount< BodyPipe > bp)=0
void postAppend(size_t size)
Definition: BodyPipe.cc:394
bool hasContent() const
Definition: MemBuf.h:54
void clearConsumer()
Definition: BodyPipe.cc:254
#define NULL
Definition: types.h:166
int size
Definition: ModDevPoll.cc:77
A const & min(A const &lhs, A const &rhs)
BodyPipe(Producer *aProducer)
Definition: BodyPipe.cc:128
#define false
Definition: GnuRegex.c:233
bool productionEnded() const
Definition: BodyPipe.h:113
BodySink(const BodyPipe::Pointer &bp)
Definition: BodyPipe.cc:21
MemBuf & buf
Definition: BodyPipe.h:74
BodyPipeCheckout(BodyPipe &)
Definition: BodyPipe.cc:480
Consumer::Pointer theConsumer
Definition: BodyPipe.h:159
void terminate()
Definition: MemBuf.cc:250

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors