Queue.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9#ifndef SQUID_IPC_QUEUE_H
10#define SQUID_IPC_QUEUE_H
11
12#include "base/InstanceId.h"
13#include "debug/Stream.h"
15#include "ipc/mem/Pointer.h"
16#include "util.h"
17
18#include <algorithm>
19#include <atomic>
20
21class String;
22
23namespace Ipc
24{
25
29{
30public:
31 QueueReader(); // the initial state is "blocked without a signal"
32
34 bool blocked() const { return popBlocked.load(); }
35
37 bool signaled() const { return popSignal.load(); }
38
40 void block() { popBlocked.store(true); }
41
43 void unblock() { popBlocked.store(false); }
44
47 bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
48
50 void clearSignal() { unblock(); popSignal.store(false); }
51
52private:
53 std::atomic<bool> popBlocked;
54 std::atomic<bool> popSignal;
55
56public:
57 typedef std::atomic<int> Rate;
59
60 // we need a signed atomic type because balance may get negative
61 typedef std::atomic<int> AtomicSignedMsec;
65
68};
69
72{
73public:
74 QueueReaders(const int aCapacity);
75 size_t sharedMemorySize() const;
76 static size_t SharedMemorySize(const int capacity);
77
78 const int theCapacity;
80};
81
93{
94public:
95 // pop() and push() exceptions; TODO: use TextException instead
96 class Full {};
97 class ItemTooLarge {};
98
99 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
100
101 unsigned int maxItemSize() const { return theMaxItemSize; }
102 int size() const { return theSize; }
103 int capacity() const { return theCapacity; }
105
106 bool empty() const { return !theSize; }
107 bool full() const { return theSize == theCapacity; }
108
109 static int Bytes2Items(const unsigned int maxItemSize, int size);
110 static int Items2Bytes(const unsigned int maxItemSize, const int size);
111
113 template<class Value> bool pop(Value &value, QueueReader *const reader = nullptr);
114
116 template<class Value> bool push(const Value &value, QueueReader *const reader = nullptr);
117
119 template<class Value> bool peek(Value &value) const;
120
122 template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
124 template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
125
126private:
127 void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
128 void statClose(std::ostream &) const;
129 template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
130 template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
131
132 // optimization: these non-std::atomic data members are in shared memory,
133 // but each is used only by one process (aside from obscured reporting)
134 unsigned int theIn;
135 unsigned int theOut;
136
137 std::atomic<uint32_t> theSize;
138 const unsigned int theMaxItemSize;
139 const uint32_t theCapacity;
140
141 char theBuffer[];
142};
143
146{
147public:
148 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
149
150 size_t sharedMemorySize() const;
151 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
152
153 const OneToOneUniQueue &operator [](const int index) const;
154 inline OneToOneUniQueue &operator [](const int index);
155
156private:
157 inline const OneToOneUniQueue &front() const;
158
159public:
160 const int theCapacity;
161};
162
168{
169public:
170 BaseMultiQueue(const int aLocalProcessId);
171 virtual ~BaseMultiQueue() {}
172
174 void clearReaderSignal(const int remoteProcessId);
175
178
180 template <class Value> bool pop(int &remoteProcessId, Value &value);
181
183 template <class Value> bool push(const int remoteProcessId, const Value &value);
184
186 template<class Value> bool peek(int &remoteProcessId, Value &value) const;
187
189 template<class Value> void stat(std::ostream &) const;
190
193
195 const QueueReader::Balance &balance(const int remoteProcessId) const;
196
199
201 const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
202
204 int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
205
207 int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
208
209protected:
211 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
212 OneToOneUniQueue &inQueue(const int remoteProcessId);
213
215 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
216 OneToOneUniQueue &outQueue(const int remoteProcessId);
217
218 virtual const QueueReader &localReader() const = 0;
220
221 virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
222 QueueReader &remoteReader(const int remoteProcessId);
223
224 virtual int remotesCount() const = 0;
225 virtual int remotesIdOffset() const = 0;
226
227protected:
229
230private:
232};
233
243{
244public:
247
248private:
250 struct Metadata {
251 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
252 size_t sharedMemorySize() const { return sizeof(*this); }
253 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
254
255 const int theGroupASize;
257 const int theGroupBSize;
259 };
260
261public:
262 class Owner
263 {
264 public:
265 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
266 ~Owner();
267
268 private:
272 };
273
274 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
275
276 enum Group { groupA = 0, groupB = 1 };
277 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
278
280 static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
281
284 template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
285
286protected:
287 const OneToOneUniQueue &inQueue(const int remoteProcessId) const override;
288 const OneToOneUniQueue &outQueue(const int remoteProcessId) const override;
289 const QueueReader &localReader() const override;
290 const QueueReader &remoteReader(const int processId) const override;
291 int remotesCount() const override;
292 int remotesIdOffset() const override;
293
294private:
295 bool validProcessId(const Group group, const int processId) const;
296 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
297 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
298 int readerIndex(const Group group, const int processId) const;
299 Group localGroup() const { return theLocalGroup; }
301
302private:
306
308};
309
317{
318public:
321
322private:
324 struct Metadata {
325 Metadata(const int aProcessCount, const int aProcessIdOffset);
326 size_t sharedMemorySize() const { return sizeof(*this); }
327 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
328
331 };
332
333public:
334 class Owner
335 {
336 public:
337 Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
338 ~Owner();
339
340 private:
344 };
345
346 static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
347
348 MultiQueue(const String &id, const int localProcessId);
349
350protected:
351 const OneToOneUniQueue &inQueue(const int remoteProcessId) const override;
352 const OneToOneUniQueue &outQueue(const int remoteProcessId) const override;
353 const QueueReader &localReader() const override;
354 const QueueReader &remoteReader(const int remoteProcessId) const override;
355 int remotesCount() const override;
356 int remotesIdOffset() const override;
357
358private:
359 bool validProcessId(const int processId) const;
360 const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
361 const QueueReader &reader(const int processId) const;
362
363private:
367};
368
369// OneToOneUniQueue
370
371template <class Value>
372bool
373OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
374{
375 if (sizeof(value) > theMaxItemSize)
376 throw ItemTooLarge();
377
378 // A writer might push between the empty test and block() below, so we do
379 // not return false right after calling block(), but test again.
380 if (empty()) {
381 if (!reader)
382 return false;
383
384 reader->block();
385 // A writer might push between the empty test and block() below,
386 // so we must test again as such a writer will not signal us.
387 if (empty())
388 return false;
389 }
390
391 if (reader)
392 reader->unblock();
393
394 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
395 memcpy(&value, theBuffer + pos, sizeof(value));
396 --theSize;
397
398 return true;
399}
400
401template <class Value>
402bool
403OneToOneUniQueue::peek(Value &value) const
404{
405 if (sizeof(value) > theMaxItemSize)
406 throw ItemTooLarge();
407
408 if (empty())
409 return false;
410
411 // the reader may pop() before we copy; making this method imprecise
412 const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
413 memcpy(&value, theBuffer + pos, sizeof(value));
414 return true;
415}
416
417template <class Value>
418bool
419OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
420{
421 if (sizeof(value) > theMaxItemSize)
422 throw ItemTooLarge();
423
424 if (full())
425 throw Full();
426
427 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
428 memcpy(theBuffer + pos, &value, sizeof(value));
429 const bool wasEmpty = !theSize++;
430
431 return wasEmpty && (!reader || reader->raiseSignal());
432}
433
434template <class Value>
435void
436OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
437{
438 os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
439 // Nobody can modify our theOut so, after capturing some valid theSize value
440 // in count, we can reliably report all [theOut, theOut+count) items that
441 // were queued at theSize capturing time. We will miss new items push()ed by
442 // the other side, but it is OK -- we report state at the capturing time.
443 const auto count = theSize.load();
444 statOpen(os, "other", "popIndex", count);
445 statSamples<Value>(os, theOut, count);
446 statClose(os);
447}
448
449template <class Value>
450void
451OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
452{
453 os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
454 // Nobody can modify our theIn so, after capturing some valid theSize value
455 // in count, we can reliably report all [theIn-count, theIn) items that were
456 // queued at theSize capturing time. We may report items already pop()ed by
457 // the other side, but that is OK because pop() does not modify items -- it
458 // only increments theOut.
459 const auto count = theSize.load();
460 statOpen(os, "pushIndex", "other", count);
461 statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
462 statClose(os);
463}
464
466template <class Value>
467void
468OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
469{
470 if (!count) {
471 os << " ";
472 return;
473 }
474
475 os << ", items: [\n";
476 // report a few leading and trailing items, without repetitions
477 const auto sampleSize = std::min(3U, count); // leading (and max) sample
478 statRange<Value>(os, start, sampleSize);
479 if (sampleSize < count) { // the first sample did not show some items
480 // The `start` offset aside, the first sample reported all items
481 // below the sampleSize offset. The second sample needs to report
482 // the last sampleSize items (i.e. starting at count-sampleSize
483 // offset) except those already reported by the first sample.
484 const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
485 const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
486
487 // but first we print a sample separator, unless there are no items
488 // between the samples or the separator hides the only unsampled item
489 const auto bothSamples = sampleSize + secondSampleSize;
490 if (bothSamples + 1U == count)
491 statRange<Value>(os, start + sampleSize, 1);
492 else if (count > bothSamples)
493 os << " # ... " << (count - bothSamples) << " items not shown ...\n";
494
495 statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
496 }
497 os << " ]";
498}
499
501template <class Value>
502void
503OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
504{
505 assert(sizeof(Value) <= theMaxItemSize);
506 auto offset = start;
507 for (uint32_t i = 0; i < n; ++i) {
508 // XXX: Throughout this C++ header, these overflow wrapping tricks work
509 // only because theCapacity currently happens to be a power of 2 (e.g.,
510 // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
511 const auto pos = (offset++ % theCapacity) * theMaxItemSize;
512 Value value;
513 memcpy(&value, theBuffer + pos, sizeof(value));
514 os << " { ";
515 value.stat(os);
516 os << " },\n";
517 }
518}
519
520// OneToOneUniQueues
521
522inline OneToOneUniQueue &
524{
525 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
526}
527
528inline const OneToOneUniQueue &
530{
531 const char *const queue =
532 reinterpret_cast<const char *>(this) + sizeof(*this);
533 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
534}
535
536// BaseMultiQueue
537
538template <class Value>
539bool
540BaseMultiQueue::pop(int &remoteProcessId, Value &value)
541{
542 // iterate all remote processes, starting after the one we visited last
543 for (int i = 0; i < remotesCount(); ++i) {
547 if (queue.pop(value, &localReader())) {
548 remoteProcessId = theLastPopProcessId;
549 debugs(54, 7, "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
550 return true;
551 }
552 }
553 return false; // no process had anything to pop
554}
555
556template <class Value>
557bool
558BaseMultiQueue::push(const int remoteProcessId, const Value &value)
559{
560 OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
561 QueueReader &reader = remoteReader(remoteProcessId);
562 debugs(54, 7, "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
563 return remoteQueue.push(value, &reader);
564}
565
566template <class Value>
567bool
568BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
569{
570 // mimic FewToFewBiQueue::pop() but quit just before popping
571 int popProcessId = theLastPopProcessId; // preserve for future pop()
572 for (int i = 0; i < remotesCount(); ++i) {
573 if (++popProcessId >= remotesIdOffset() + remotesCount())
574 popProcessId = remotesIdOffset();
575 const OneToOneUniQueue &queue = inQueue(popProcessId);
576 if (queue.peek(value)) {
577 remoteProcessId = popProcessId;
578 return true;
579 }
580 }
581 return false; // most likely, no process had anything to pop
582}
583
584template <class Value>
585void
586BaseMultiQueue::stat(std::ostream &os) const
587{
588 for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
589 const auto &queue = inQueue(processId);
590 queue.statIn<Value>(os, theLocalProcessId, processId);
591 }
592
593 os << "\n";
594
595 for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
596 const auto &queue = outQueue(processId);
597 queue.statOut<Value>(os, theLocalProcessId, processId);
598 }
599
600 os << "\n";
601
602 const auto &reader = localReader();
603 os << " kid" << theLocalProcessId << " reader flags: " <<
604 "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n";
605}
606
607// FewToFewBiQueue
608
609template <class Value>
610bool
611FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
612{
613 // we may be called before remote process configured its queue end
614 if (!validProcessId(remoteGroup(), remoteProcessId))
615 return false;
616
617 // we need the oldest value, so start with the incoming, them-to-us queue:
618 const OneToOneUniQueue &in = inQueue(remoteProcessId);
619 debugs(54, 2, "peeking from " << remoteProcessId << " to " <<
620 theLocalProcessId << " at " << in.size());
621 if (in.peek(value))
622 return true;
623
624 // if the incoming queue is empty, check the outgoing, us-to-them queue:
625 const OneToOneUniQueue &out = outQueue(remoteProcessId);
626 debugs(54, 2, "peeking from " << theLocalProcessId << " to " <<
627 remoteProcessId << " at " << out.size());
628 return out.peek(value);
629}
630
631} // namespace Ipc
632
633#endif // SQUID_IPC_QUEUE_H
634
#define assert(EX)
Definition: assert.h:17
virtual int remotesCount() const =0
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:187
void clearAllReaderSignals()
clears all reader notifications received by the local process
Definition: Queue.cc:172
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
QueueReader::Balance & localBalance()
returns local reader's balance
Definition: Queue.h:192
virtual int remotesIdOffset() const =0
bool peek(int &remoteProcessId, Value &value) const
peeks at the item likely to be pop()ed next
Definition: Queue.h:568
virtual ~BaseMultiQueue()
Definition: Queue.h:171
virtual const QueueReader & localReader() const =0
int inSize(const int remoteProcessId) const
number of items in incoming queue from a given remote process
Definition: Queue.h:204
void stat(std::ostream &) const
prints current state; suitable for cache manager reports
Definition: Queue.h:586
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
Definition: Queue.cc:159
bool push(const int remoteProcessId, const Value &value)
calls OneToOneUniQueue::push() using the given process queue
Definition: Queue.h:558
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:152
bool pop(int &remoteProcessId, Value &value)
picks a process and calls OneToOneUniQueue::pop() using its queue
Definition: Queue.h:540
int theLastPopProcessId
the ID of the last process we tried to pop() from
Definition: Queue.h:231
int outSize(const int remoteProcessId) const
number of items in outgoing queue to a given remote process
Definition: Queue.h:207
QueueReader::Rate & localRateLimit()
returns local reader's rate limit
Definition: Queue.h:198
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:180
const int theLocalProcessId
process ID of this queue
Definition: Queue.h:228
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:351
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:270
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:271
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:269
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:303
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:267
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:304
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:302
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:305
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:289
OneToOneUniQueue::ItemTooLarge ItemTooLarge
Definition: Queue.h:246
const Group theLocalGroup
group of this queue
Definition: Queue.h:307
OneToOneUniQueue::Full Full
Definition: Queue.h:245
const QueueReader & remoteReader(const int processId) const override
Definition: Queue.cc:324
const QueueReader & localReader() const override
Definition: Queue.cc:318
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:247
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:309
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:253
bool findOldest(const int remoteProcessId, Value &value) const
Definition: Queue.h:611
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:228
int remotesCount() const override
Definition: Queue.cc:330
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:295
int remotesIdOffset() const override
Definition: Queue.cc:337
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:233
Group remoteGroup() const
Definition: Queue.h:300
Group localGroup() const
Definition: Queue.h:299
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:342
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:453
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:343
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:341
int remotesCount() const override
Definition: Queue.cc:436
const QueueReader & reader(const int processId) const
Definition: Queue.cc:404
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:368
OneToOneUniQueue::ItemTooLarge ItemTooLarge
Definition: Queue.h:320
bool validProcessId(const int processId) const
Definition: Queue.cc:386
const QueueReader & localReader() const override
Definition: Queue.cc:424
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:364
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:418
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:373
const QueueReader & remoteReader(const int remoteProcessId) const override
Definition: Queue.cc:430
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:393
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:412
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:366
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:365
int remotesIdOffset() const override
Definition: Queue.cc:442
OneToOneUniQueue::Full Full
Definition: Queue.h:319
bool full() const
Definition: Queue.h:107
unsigned int theIn
current push() position; reporting aside, used only in push()
Definition: Queue.h:134
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
Definition: Queue.h:139
bool empty() const
Definition: Queue.h:106
std::atomic< uint32_t > theSize
number of items in the queue
Definition: Queue.h:137
bool peek(Value &value) const
returns true iff the value was set; the value may be stale!
Definition: Queue.h:403
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
const unsigned int theMaxItemSize
maximum item size
Definition: Queue.h:138
bool push(const Value &value, QueueReader *const reader=nullptr)
returns true iff the caller must notify the reader of the pushed item
Definition: Queue.h:419
void statClose(std::ostream &) const
end state reporting started by statOpen()
Definition: Queue.cc:112
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
unsigned int maxItemSize() const
Definition: Queue.h:101
int sharedMemorySize() const
Definition: Queue.h:104
int capacity() const
Definition: Queue.h:103
void statIn(std::ostream &, int localProcessId, int remoteProcessId) const
prints incoming queue state; suitable for cache manager reports
Definition: Queue.h:436
bool pop(Value &value, QueueReader *const reader=nullptr)
returns true iff the value was set; [un]blocks the reader as needed
Definition: Queue.h:373
void statOut(std::ostream &, int localProcessId, int remoteProcessId) const
prints outgoing queue state; suitable for cache manager reports
Definition: Queue.h:451
void statSamples(std::ostream &, unsigned int start, uint32_t size) const
report a sample of [start, start + size) items
Definition: Queue.h:468
void statRange(std::ostream &, unsigned int start, uint32_t n) const
statSamples() helper that reports n items from start
Definition: Queue.h:503
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
Definition: Queue.cc:102
unsigned int theOut
current pop() position; reporting aside, used only in pop()/peek()
Definition: Queue.h:135
int size() const
Definition: Queue.h:102
shared array of OneToOneUniQueues
Definition: Queue.h:146
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:119
const OneToOneUniQueue & front() const
Definition: Queue.h:529
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:141
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:133
const int theCapacity
Definition: Queue.h:160
size_t sharedMemorySize() const
Definition: Queue.cc:127
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:67
void block()
marks the reader as blocked, waiting for a notification signal
Definition: Queue.h:40
std::atomic< bool > popSignal
whether writer has sent and reader has not received notification
Definition: Queue.h:54
std::atomic< bool > popBlocked
whether the reader is blocked on pop()
Definition: Queue.h:53
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:64
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:50
Rate rateLimit
pop()s per second limit if positive
Definition: Queue.h:58
bool blocked() const
whether the reader is waiting for a notification signal
Definition: Queue.h:34
bool raiseSignal()
Definition: Queue.h:47
std::atomic< int > Rate
pop()s per second
Definition: Queue.h:57
bool signaled() const
whether writer has sent and reader has not received notification
Definition: Queue.h:37
AtomicSignedMsec Balance
Definition: Queue.h:62
std::atomic< int > AtomicSignedMsec
Definition: Queue.h:61
void unblock()
removes the block() effects
Definition: Queue.h:43
shared array of QueueReaders
Definition: Queue.h:72
const int theCapacity
Definition: Queue.h:78
QueueReaders(const int aCapacity)
Definition: Queue.cc:55
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68
Ipc::Mem::FlexibleArray< QueueReader > theReaders
number of readers
Definition: Queue.h:79
size_t sharedMemorySize() const
Definition: Queue.cc:62
A const & max(A const &lhs, A const &rhs)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
Definition: IpcIoFile.h:24
Shared metadata for FewToFewBiQueue.
Definition: Queue.h:250
size_t sharedMemorySize() const
Definition: Queue.h:252
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:343
static size_t SharedMemorySize(const int, const int, const int, const int)
Definition: Queue.h:253
Shared metadata for MultiQueue.
Definition: Queue.h:324
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:447
static size_t SharedMemorySize(const int, const int)
Definition: Queue.h:327
const int theProcessIdOffset
Definition: Queue.h:330
const int theProcessCount
Definition: Queue.h:329
size_t sharedMemorySize() const
Definition: Queue.h:326

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors