Queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #ifndef SQUID_IPC_QUEUE_H
10 #define SQUID_IPC_QUEUE_H
11 
12 #include "base/InstanceId.h"
13 #include "Debug.h"
14 #include "ipc/mem/FlexibleArray.h"
15 #include "ipc/mem/Pointer.h"
16 #include "util.h"
17 
18 #include <algorithm>
19 #include <atomic>
20 
21 class String;
22 
23 namespace Ipc
24 {
25 
29 {
30 public:
31  QueueReader(); // the initial state is "blocked without a signal"
32 
34  bool blocked() const { return popBlocked.load(); }
35 
37  void block() { popBlocked.store(true); }
38 
40  void unblock() { popBlocked.store(false); }
41 
44  bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
45 
47  void clearSignal() { unblock(); popSignal.store(false); }
48 
49 private:
50  std::atomic<bool> popBlocked;
51  std::atomic<bool> popSignal;
52 
53 public:
54  typedef std::atomic<int> Rate;
56 
57  // we need a signed atomic type because balance may get negative
58  typedef std::atomic<int> AtomicSignedMsec;
62 
65 };
66 
69 {
70 public:
71  QueueReaders(const int aCapacity);
72  size_t sharedMemorySize() const;
73  static size_t SharedMemorySize(const int capacity);
74 
75  const int theCapacity;
77 };
78 
90 {
91 public:
92  // pop() and push() exceptions; TODO: use TextException instead
93  class Full {};
94  class ItemTooLarge {};
95 
96  OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
97 
98  unsigned int maxItemSize() const { return theMaxItemSize; }
99  int size() const { return theSize; }
100  int capacity() const { return theCapacity; }
102 
103  bool empty() const { return !theSize; }
104  bool full() const { return theSize == theCapacity; }
105 
106  static int Bytes2Items(const unsigned int maxItemSize, int size);
107  static int Items2Bytes(const unsigned int maxItemSize, const int size);
108 
110  template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
111 
113  template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
114 
116  template<class Value> bool peek(Value &value) const;
117 
119  template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
121  template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
122 
123 private:
124  void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
125  void statClose(std::ostream &) const;
126  template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
127  template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
128 
129  // optimization: these non-std::atomic data members are in shared memory,
130  // but each is used only by one process (aside from obscured reporting)
131  unsigned int theIn;
132  unsigned int theOut;
133 
134  std::atomic<uint32_t> theSize;
135  const unsigned int theMaxItemSize;
136  const uint32_t theCapacity;
137 
138  char theBuffer[];
139 };
140 
143 {
144 public:
145  OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
146 
147  size_t sharedMemorySize() const;
148  static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
149 
150  const OneToOneUniQueue &operator [](const int index) const;
151  inline OneToOneUniQueue &operator [](const int index);
152 
153 private:
154  inline const OneToOneUniQueue &front() const;
155 
156 public:
157  const int theCapacity;
158 };
159 
165 {
166 public:
167  BaseMultiQueue(const int aLocalProcessId);
168  virtual ~BaseMultiQueue() {}
169 
171  void clearReaderSignal(const int remoteProcessId);
172 
174  template <class Value> bool pop(int &remoteProcessId, Value &value);
175 
177  template <class Value> bool push(const int remoteProcessId, const Value &value);
178 
180  template<class Value> bool peek(int &remoteProcessId, Value &value) const;
181 
183  template<class Value> void stat(std::ostream &) const;
184 
187 
189  const QueueReader::Balance &balance(const int remoteProcessId) const;
190 
193 
195  const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
196 
198  int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
199 
201  int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
202 
203 protected:
205  virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
206  OneToOneUniQueue &inQueue(const int remoteProcessId);
207 
209  virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
210  OneToOneUniQueue &outQueue(const int remoteProcessId);
211 
212  virtual const QueueReader &localReader() const = 0;
214 
215  virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
216  QueueReader &remoteReader(const int remoteProcessId);
217 
218  virtual int remotesCount() const = 0;
219  virtual int remotesIdOffset() const = 0;
220 
221 protected:
222  const int theLocalProcessId;
223 
224 private:
226 };
227 
237 {
238 public:
241 
242 private:
244  struct Metadata {
245  Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
246  size_t sharedMemorySize() const { return sizeof(*this); }
247  static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
248 
249  const int theGroupASize;
250  const int theGroupAIdOffset;
251  const int theGroupBSize;
252  const int theGroupBIdOffset;
253  };
254 
255 public:
256  class Owner
257  {
258  public:
259  Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
260  ~Owner();
261 
262  private:
266  };
267 
268  static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
269 
270  enum Group { groupA = 0, groupB = 1 };
271  FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
272 
274  static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
275 
278  template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
279 
280 protected:
281  virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
282  virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
283  virtual const QueueReader &localReader() const;
284  virtual const QueueReader &remoteReader(const int processId) const;
285  virtual int remotesCount() const;
286  virtual int remotesIdOffset() const;
287 
288 private:
289  bool validProcessId(const Group group, const int processId) const;
290  int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
291  const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
292  int readerIndex(const Group group, const int processId) const;
293  Group localGroup() const { return theLocalGroup; }
294  Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
295 
296 private:
300 
302 };
303 
311 {
312 public:
315 
316 private:
318  struct Metadata {
319  Metadata(const int aProcessCount, const int aProcessIdOffset);
320  size_t sharedMemorySize() const { return sizeof(*this); }
321  static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
322 
323  const int theProcessCount;
325  };
326 
327 public:
328  class Owner
329  {
330  public:
331  Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
332  ~Owner();
333 
334  private:
338  };
339 
340  static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
341 
342  MultiQueue(const String &id, const int localProcessId);
343 
344 protected:
345  virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
346  virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
347  virtual const QueueReader &localReader() const;
348  virtual const QueueReader &remoteReader(const int remoteProcessId) const;
349  virtual int remotesCount() const;
350  virtual int remotesIdOffset() const;
351 
352 private:
353  bool validProcessId(const int processId) const;
354  const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
355  const QueueReader &reader(const int processId) const;
356 
357 private:
361 };
362 
363 // OneToOneUniQueue
364 
365 template <class Value>
366 bool
367 OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
368 {
369  if (sizeof(value) > theMaxItemSize)
370  throw ItemTooLarge();
371 
372  // A writer might push between the empty test and block() below, so we do
373  // not return false right after calling block(), but test again.
374  if (empty()) {
375  if (!reader)
376  return false;
377 
378  reader->block();
379  // A writer might push between the empty test and block() below,
380  // so we must test again as such a writer will not signal us.
381  if (empty())
382  return false;
383  }
384 
385  if (reader)
386  reader->unblock();
387 
388  const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
389  memcpy(&value, theBuffer + pos, sizeof(value));
390  --theSize;
391 
392  return true;
393 }
394 
395 template <class Value>
396 bool
397 OneToOneUniQueue::peek(Value &value) const
398 {
399  if (sizeof(value) > theMaxItemSize)
400  throw ItemTooLarge();
401 
402  if (empty())
403  return false;
404 
405  // the reader may pop() before we copy; making this method imprecise
406  const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
407  memcpy(&value, theBuffer + pos, sizeof(value));
408  return true;
409 }
410 
411 template <class Value>
412 bool
413 OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
414 {
415  if (sizeof(value) > theMaxItemSize)
416  throw ItemTooLarge();
417 
418  if (full())
419  throw Full();
420 
421  const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
422  memcpy(theBuffer + pos, &value, sizeof(value));
423  const bool wasEmpty = !theSize++;
424 
425  return wasEmpty && (!reader || reader->raiseSignal());
426 }
427 
428 template <class Value>
429 void
430 OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
431 {
432  os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
433  // Nobody can modify our theOut so, after capturing some valid theSize value
434  // in count, we can reliably report all [theOut, theOut+count) items that
435  // were queued at theSize capturing time. We will miss new items push()ed by
436  // the other side, but it is OK -- we report state at the capturing time.
437  const auto count = theSize.load();
438  statOpen(os, "other", "popIndex", count);
439  statSamples<Value>(os, theOut, count);
440  statClose(os);
441 }
442 
443 template <class Value>
444 void
445 OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
446 {
447  os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
448  // Nobody can modify our theIn so, after capturing some valid theSize value
449  // in count, we can reliably report all [theIn-count, theIn) items that were
450  // queued at theSize capturing time. We may report items already pop()ed by
451  // the other side, but that is OK because pop() does not modify items -- it
452  // only increments theOut.
453  const auto count = theSize.load();
454  statOpen(os, "pushIndex", "other", count);
455  statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
456  statClose(os);
457 }
458 
460 template <class Value>
461 void
462 OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
463 {
464  if (!count) {
465  os << " ";
466  return;
467  }
468 
469  os << ", items: [\n";
470  // report a few leading and trailing items, without repetitions
471  const auto sampleSize = std::min(3U, count); // leading (and max) sample
472  statRange<Value>(os, start, sampleSize);
473  if (sampleSize < count) { // the first sample did not show some items
474  // The `start` offset aside, the first sample reported all items
475  // below the sampleSize offset. The second sample needs to report
476  // the last sampleSize items (i.e. starting at count-sampleSize
477  // offset) except those already reported by the first sample.
478  const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
479  const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
480 
481  // but first we print a sample separator, unless there are no items
482  // between the samples or the separator hides the only unsampled item
483  const auto bothSamples = sampleSize + secondSampleSize;
484  if (bothSamples + 1U == count)
485  statRange<Value>(os, start + sampleSize, 1);
486  else if (count > bothSamples)
487  os << " # ... " << (count - bothSamples) << " items not shown ...\n";
488 
489  statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
490  }
491  os << " ]";
492 }
493 
495 template <class Value>
496 void
497 OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
498 {
499  assert(sizeof(Value) <= theMaxItemSize);
500  auto offset = start;
501  for (uint32_t i = 0; i < n; ++i) {
502  // XXX: Throughout this C++ header, these overflow wrapping tricks work
503  // only because theCapacity currently happens to be a power of 2 (e.g.,
504  // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
505  const auto pos = (offset++ % theCapacity) * theMaxItemSize;
506  Value value;
507  memcpy(&value, theBuffer + pos, sizeof(value));
508  os << " { ";
509  value.stat(os);
510  os << " },\n";
511  }
512 }
513 
514 // OneToOneUniQueues
515 
516 inline OneToOneUniQueue &
518 {
519  return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
520 }
521 
522 inline const OneToOneUniQueue &
524 {
525  const char *const queue =
526  reinterpret_cast<const char *>(this) + sizeof(*this);
527  return *reinterpret_cast<const OneToOneUniQueue *>(queue);
528 }
529 
530 // BaseMultiQueue
531 
532 template <class Value>
533 bool
534 BaseMultiQueue::pop(int &remoteProcessId, Value &value)
535 {
536  // iterate all remote processes, starting after the one we visited last
537  for (int i = 0; i < remotesCount(); ++i) {
541  if (queue.pop(value, &localReader())) {
542  remoteProcessId = theLastPopProcessId;
543  debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
544  return true;
545  }
546  }
547  return false; // no process had anything to pop
548 }
549 
550 template <class Value>
551 bool
552 BaseMultiQueue::push(const int remoteProcessId, const Value &value)
553 {
554  OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
555  QueueReader &reader = remoteReader(remoteProcessId);
556  debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
557  return remoteQueue.push(value, &reader);
558 }
559 
560 template <class Value>
561 bool
562 BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
563 {
564  // mimic FewToFewBiQueue::pop() but quit just before popping
565  int popProcessId = theLastPopProcessId; // preserve for future pop()
566  for (int i = 0; i < remotesCount(); ++i) {
567  if (++popProcessId >= remotesIdOffset() + remotesCount())
568  popProcessId = remotesIdOffset();
569  const OneToOneUniQueue &queue = inQueue(popProcessId);
570  if (queue.peek(value)) {
571  remoteProcessId = popProcessId;
572  return true;
573  }
574  }
575  return false; // most likely, no process had anything to pop
576 }
577 
578 template <class Value>
579 void
580 BaseMultiQueue::stat(std::ostream &os) const
581 {
582  for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
583  const auto &queue = inQueue(processId);
584  queue.statIn<Value>(os, theLocalProcessId, processId);
585  }
586 
587  os << "\n";
588 
589  for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
590  const auto &queue = outQueue(processId);
591  queue.statOut<Value>(os, theLocalProcessId, processId);
592  }
593 }
594 
595 // FewToFewBiQueue
596 
597 template <class Value>
598 bool
599 FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
600 {
601  // we may be called before remote process configured its queue end
602  if (!validProcessId(remoteGroup(), remoteProcessId))
603  return false;
604 
605  // we need the oldest value, so start with the incoming, them-to-us queue:
606  const OneToOneUniQueue &in = inQueue(remoteProcessId);
607  debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
608  theLocalProcessId << " at " << in.size());
609  if (in.peek(value))
610  return true;
611 
612  // if the incoming queue is empty, check the outgoing, us-to-them queue:
613  const OneToOneUniQueue &out = outQueue(remoteProcessId);
614  debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
615  remoteProcessId << " at " << out.size());
616  return out.peek(value);
617 }
618 
619 } // namespace Ipc
620 
621 #endif // SQUID_IPC_QUEUE_H
622 
bool validProcessId(const int processId) const
Definition: Queue.cc:379
virtual int remotesCount() const
Definition: Queue.cc:429
int sharedMemorySize() const
Definition: Queue.h:101
virtual int remotesIdOffset() const
Definition: Queue.cc:435
virtual int remotesIdOffset() const =0
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:64
virtual int remotesIdOffset() const
Definition: Queue.cc:330
std::atomic< uint32_t > theSize
number of items in the queue
Definition: Queue.h:134
Group remoteGroup() const
Definition: Queue.h:294
std::atomic< int > AtomicSignedMsec
Definition: Queue.h:58
virtual const QueueReader & localReader() const =0
Definition: Queue.cc:203
const OneToOneUniQueue & front() const
Definition: Queue.h:523
const int theProcessIdOffset
Definition: Queue.h:324
void stat(std::ostream &) const
prints current state; suitable for cache manager reports
Definition: Queue.h:580
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const
outgoing queue to a given remote process
Definition: Queue.cc:411
virtual ~BaseMultiQueue()
Definition: Queue.h:168
Rate rateLimit
pop()s per second limit if positive
Definition: Queue.h:55
std::atomic< bool > popBlocked
whether the reader is blocked on pop()
Definition: Queue.h:50
bool pop(Value &value, QueueReader *const reader=NULL)
returns true iff the value was set; [un]blocks the reader as needed
Definition: Queue.h:367
QueueReader::Balance & localBalance()
returns local reader's balance
Definition: Queue.h:186
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:337
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:226
void statSamples(std::ostream &, unsigned int start, uint32_t size) const
report a sample of [start, start + size) items
Definition: Queue.h:462
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:336
bool push(const int remoteProcessId, const Value &value)
calls OneToOneUniQueue::push() using the given process queue
Definition: Queue.h:552
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
Definition: Queue.h:136
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:440
bool raiseSignal()
Definition: Queue.h:44
const A & max(A const &lhs, A const &rhs)
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:61
const QueueReader & reader(const int processId) const
Definition: Queue.cc:397
int theLastPopProcessId
the ID of the last process we tried to pop() from
Definition: Queue.h:225
size_t sharedMemorySize() const
Definition: Queue.h:246
std::atomic< int > Rate
pop()s per second
Definition: Queue.h:54
QueueReader::Rate & localRateLimit()
returns local reader's rate limit
Definition: Queue.h:192
OneToOneUniQueue::Full Full
Definition: Queue.h:239
OneToOneUniQueue::ItemTooLarge ItemTooLarge
Definition: Queue.h:314
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:359
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
virtual const QueueReader & remoteReader(const int remoteProcessId) const
Definition: Queue.cc:423
OneToOneUniQueue::Full Full
Definition: Queue.h:313
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
size_t sharedMemorySize() const
Definition: Queue.cc:127
shared array of QueueReaders
Definition: Queue.h:68
static size_t SharedMemorySize(const int, const int, const int, const int)
Definition: Queue.h:247
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:344
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:297
bool peek(Value &value) const
returns true iff the value was set; the value may be stale!
Definition: Queue.h:397
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:358
int capacity() const
Definition: Queue.h:100
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const
incoming queue from a given remote process
Definition: Queue.cc:288
void statOut(std::ostream &, int localProcessId, int remoteProcessId) const
prints outgoing queue state; suitable for cache manager reports
Definition: Queue.h:445
void unblock()
removes the block() effects
Definition: Queue.h:40
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:336
size_t sharedMemorySize() const
Definition: Queue.h:320
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:361
virtual const QueueReader & localReader() const
Definition: Queue.cc:311
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:119
#define NULL
Definition: types.h:166
int inSize(const int remoteProcessId) const
number of items in incoming queue from a given remote process
Definition: Queue.h:198
virtual const QueueReader & remoteReader(const int processId) const
Definition: Queue.cc:317
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:128
bool findOldest(const int remoteProcessId, Value &value) const
Definition: Queue.h:599
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:133
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:47
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:263
virtual int remotesCount() const =0
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
const int theCapacity
Definition: Queue.h:75
bool pop(int &remoteProcessId, Value &value)
picks a process and calls OneToOneUniQueue::pop() using its queue
Definition: Queue.h:534
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:157
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:152
static size_t SharedMemorySize(const int, const int)
Definition: Queue.h:321
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:240
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:265
Ipc::Mem::FlexibleArray< QueueReader > theReaders
number of readers
Definition: Queue.h:76
#define assert(EX)
Definition: assert.h:19
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:299
void block()
marks the reader as blocked, waiting for a notification signal
Definition: Queue.h:37
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:386
bool empty() const
Definition: Queue.h:103
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
Definition: Queue.cc:102
bool peek(int &remoteProcessId, Value &value) const
peeks at the item likely to be pop()ed next
Definition: Queue.h:562
const int theCapacity
Definition: Queue.h:157
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:180
Shared metadata for MultiQueue.
Definition: Queue.h:318
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const
incoming queue from a given remote process
Definition: Queue.cc:405
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:298
void statIn(std::ostream &, int localProcessId, int remoteProcessId) const
prints incoming queue state; suitable for cache manager reports
Definition: Queue.h:430
std::atomic< bool > popSignal
whether writer has sent and reader has not received notification
Definition: Queue.h:51
const int theProcessCount
Definition: Queue.h:323
void statClose(std::ostream &) const
end state reporting started by statOpen()
Definition: Queue.cc:112
QueueReaders(const int aCapacity)
Definition: Queue.cc:55
AtomicSignedMsec Balance
Definition: Queue.h:59
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:282
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const
outgoing queue to a given remote process
Definition: Queue.cc:295
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:221
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:173
Shared metadata for FewToFewBiQueue.
Definition: Queue.h:244
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:264
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:141
const unsigned int theMaxItemSize
maximum item size
Definition: Queue.h:135
size_t sharedMemorySize() const
Definition: Queue.cc:62
unsigned int theIn
current push() position; reporting aside, used only in push()
Definition: Queue.h:131
OneToOneUniQueue::ItemTooLarge ItemTooLarge
Definition: Queue.h:240
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
Definition: Queue.cc:159
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:246
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:366
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:302
int outSize(const int remoteProcessId) const
number of items in outgoing queue to a given remote process
Definition: Queue.h:201
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:260
const int theLocalProcessId
process ID of this queue
Definition: Queue.h:222
int size() const
Definition: Queue.h:99
virtual const QueueReader & localReader() const
Definition: Queue.cc:417
bool blocked() const
whether the reader is waiting for a notification signal
Definition: Queue.h:34
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:446
const Group theLocalGroup
group of this queue
Definition: Queue.h:301
unsigned int theOut
current pop() position; reporting aside, used only in pop()/peek()
Definition: Queue.h:132
virtual int remotesCount() const
Definition: Queue.cc:323
const A & min(A const &lhs, A const &rhs)
void statRange(std::ostream &, unsigned int start, uint32_t n) const
statSamples() helper that reports n items from start
Definition: Queue.h:497
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:360
shared array of OneToOneUniQueues
Definition: Queue.h:142
unsigned int maxItemSize() const
Definition: Queue.h:98
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68
bool push(const Value &value, QueueReader *const reader=NULL)
returns true iff the caller must notify the reader of the pushed item
Definition: Queue.h:413
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:335
Definition: IpcIoFile.h:23
bool full() const
Definition: Queue.h:104
Group localGroup() const
Definition: Queue.h:293

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors