Queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "Debug.h"
14 #include "globals.h"
15 #include "ipc/Queue.h"
16 
17 #include <limits>
18 
20 static String
22 {
23  id.append("__metadata");
24  return id;
25 }
26 
28 static String
30 {
31  id.append("__queues");
32  return id;
33 }
34 
36 static String
38 {
39  id.append("__readers");
40  return id;
41 }
42 
43 /* QueueReader */
44 
46 
47 Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
48  rateLimit(0), balance(0)
49 {
50  debugs(54, 7, HERE << "constructed " << id);
51 }
52 
53 /* QueueReaders */
54 
55 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
56  theReaders(theCapacity)
57 {
58  Must(theCapacity > 0);
59 }
60 
61 size_t
63 {
64  return SharedMemorySize(theCapacity);
65 }
66 
67 size_t
69 {
70  return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71 }
72 
73 // OneToOneUniQueue
74 
75 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
76  theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77  theCapacity(aCapacity)
78 {
79  Must(theMaxItemSize > 0);
80  Must(theCapacity > 0);
81 }
82 
83 int
84 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
85 {
86  assert(maxItemSize > 0);
87  size -= sizeof(OneToOneUniQueue);
88  return size >= 0 ? size / maxItemSize : 0;
89 }
90 
91 int
92 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
93 {
94  assert(size >= 0);
95  return sizeof(OneToOneUniQueue) + maxItemSize * size;
96 }
97 
101 void
102 Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
103 {
104  os << "{ size: " << count <<
105  ", capacity: " << theCapacity <<
106  ", " << inLabel << ": " << theIn <<
107  ", " << outLabel << ": " << theOut;
108 }
109 
111 void
112 Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
113 {
114  os << "}\n";
115 }
116 
117 /* OneToOneUniQueues */
118 
119 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
120 {
121  Must(theCapacity > 0);
122  for (int i = 0; i < theCapacity; ++i)
123  new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
124 }
125 
126 size_t
128 {
129  return sizeof(*this) + theCapacity * front().sharedMemorySize();
130 }
131 
132 size_t
133 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
134 {
135  const int queueSize =
136  OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
137  return sizeof(OneToOneUniQueues) + queueSize * capacity;
138 }
139 
140 const Ipc::OneToOneUniQueue &
142 {
143  Must(0 <= index && index < theCapacity);
144  const size_t queueSize = index ? front().sharedMemorySize() : 0;
145  const char *const queue =
146  reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
147  return *reinterpret_cast<const OneToOneUniQueue *>(queue);
148 }
149 
150 // BaseMultiQueue
151 
152 Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
153  theLocalProcessId(aLocalProcessId),
154  theLastPopProcessId(std::numeric_limits<int>::max() - 1)
155 {
156 }
157 
158 void
159 Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
160 {
161  QueueReader &reader = localReader();
162  debugs(54, 7, "reader: " << reader.id);
163 
164  reader.clearSignal();
165 
166  // we got a hint; we could reposition iteration to try popping from the
167  // remoteProcessId queue first; but it does not seem to help much and might
168  // introduce some bias so we do not do that for now:
169  // theLastPopProcessId = remoteProcessId;
170 }
171 
173 Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
174 {
175  const QueueReader &r = remoteReader(remoteProcessId);
176  return r.balance;
177 }
178 
180 Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
181 {
182  const QueueReader &r = remoteReader(remoteProcessId);
183  return r.rateLimit;
184 }
185 
187 Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
188 {
189  const OneToOneUniQueue &queue =
190  const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
191  return const_cast<OneToOneUniQueue &>(queue);
192 }
193 
195 Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
196 {
197  const OneToOneUniQueue &queue =
198  const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
199  return const_cast<OneToOneUniQueue &>(queue);
200 }
201 
204 {
205  const QueueReader &reader =
206  const_cast<const BaseMultiQueue *>(this)->localReader();
207  return const_cast<QueueReader &>(reader);
208 }
209 
211 Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
212 {
213  const QueueReader &reader =
214  const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
215  return const_cast<QueueReader &>(reader);
216 }
217 
218 // FewToFewBiQueue
219 
221 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
222 {
223  return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
224 }
225 
226 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
227  BaseMultiQueue(aLocalProcessId),
228  metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
229  queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
230  readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
231  theLocalGroup(aLocalGroup)
232 {
233  Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
234  Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
235 
236  debugs(54, 7, "queue " << id << " reader: " << localReader().id);
237 }
238 
239 int
240 Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
241 {
242  return capacity * groupASize * groupBSize * 2;
243 }
244 
245 bool
246 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
247 {
248  switch (group) {
249  case groupA:
250  return metadata->theGroupAIdOffset <= processId &&
251  processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
252  case groupB:
253  return metadata->theGroupBIdOffset <= processId &&
254  processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
255  }
256  return false;
257 }
258 
259 int
260 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
261 {
262  Must(fromGroup != toGroup);
263  assert(validProcessId(fromGroup, fromProcessId));
264  assert(validProcessId(toGroup, toProcessId));
265  int index1;
266  int index2;
267  int offset;
268  if (fromGroup == groupA) {
269  index1 = fromProcessId - metadata->theGroupAIdOffset;
270  index2 = toProcessId - metadata->theGroupBIdOffset;
271  offset = 0;
272  } else {
273  index1 = toProcessId - metadata->theGroupAIdOffset;
274  index2 = fromProcessId - metadata->theGroupBIdOffset;
275  offset = metadata->theGroupASize * metadata->theGroupBSize;
276  }
277  const int index = offset + index1 * metadata->theGroupBSize + index2;
278  return index;
279 }
280 
281 const Ipc::OneToOneUniQueue &
282 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
283 {
284  return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
285 }
286 
287 const Ipc::OneToOneUniQueue &
288 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
289 {
290  return oneToOneQueue(remoteGroup(), remoteProcessId,
291  theLocalGroup, theLocalProcessId);
292 }
293 
294 const Ipc::OneToOneUniQueue &
295 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
296 {
297  return oneToOneQueue(theLocalGroup, theLocalProcessId,
298  remoteGroup(), remoteProcessId);
299 }
300 
301 int
302 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
303 {
304  Must(validProcessId(group, processId));
305  return group == groupA ?
306  processId - metadata->theGroupAIdOffset :
307  metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
308 }
309 
310 const Ipc::QueueReader &
312 {
313  return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
314 }
315 
316 const Ipc::QueueReader &
317 Ipc::FewToFewBiQueue::remoteReader(const int processId) const
318 {
319  return readers->theReaders[readerIndex(remoteGroup(), processId)];
320 }
321 
322 int
324 {
325  return theLocalGroup == groupA ? metadata->theGroupBSize :
326  metadata->theGroupASize;
327 }
328 
329 int
331 {
332  return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
333  metadata->theGroupAIdOffset;
334 }
335 
336 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
337  theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
338  theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
339 {
340  Must(theGroupASize > 0);
341  Must(theGroupBSize > 0);
342 }
343 
344 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
345  metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
346  queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
347  readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
348 {
349 }
350 
352 {
353  delete metadataOwner;
354  delete queuesOwner;
355  delete readersOwner;
356 }
357 
358 // MultiQueue
359 
361 Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
362 {
363  return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
364 }
365 
366 Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
367  BaseMultiQueue(localProcessId),
368  metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
369  queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
370  readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
371 {
372  Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
373  Must(readers->theCapacity == metadata->theProcessCount);
374 
375  debugs(54, 7, "queue " << id << " reader: " << localReader().id);
376 }
377 
378 bool
379 Ipc::MultiQueue::validProcessId(const int processId) const
380 {
381  return metadata->theProcessIdOffset <= processId &&
382  processId < metadata->theProcessIdOffset + metadata->theProcessCount;
383 }
384 
385 const Ipc::OneToOneUniQueue &
386 Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
387 {
388  assert(validProcessId(fromProcessId));
389  assert(validProcessId(toProcessId));
390  const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
391  const int toIndex = toProcessId - metadata->theProcessIdOffset;
392  const int index = fromIndex * metadata->theProcessCount + toIndex;
393  return (*queues)[index];
394 }
395 
396 const Ipc::QueueReader &
397 Ipc::MultiQueue::reader(const int processId) const
398 {
399  assert(validProcessId(processId));
400  const int index = processId - metadata->theProcessIdOffset;
401  return readers->theReaders[index];
402 }
403 
404 const Ipc::OneToOneUniQueue &
405 Ipc::MultiQueue::inQueue(const int remoteProcessId) const
406 {
407  return oneToOneQueue(remoteProcessId, theLocalProcessId);
408 }
409 
410 const Ipc::OneToOneUniQueue &
411 Ipc::MultiQueue::outQueue(const int remoteProcessId) const
412 {
413  return oneToOneQueue(theLocalProcessId, remoteProcessId);
414 }
415 
416 const Ipc::QueueReader &
418 {
419  return reader(theLocalProcessId);
420 }
421 
422 const Ipc::QueueReader &
423 Ipc::MultiQueue::remoteReader(const int processId) const
424 {
425  return reader(processId);
426 }
427 
428 int
430 {
431  return metadata->theProcessCount;
432 }
433 
434 int
436 {
437  return metadata->theProcessIdOffset;
438 }
439 
440 Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
441  theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
442 {
443  Must(theProcessCount > 0);
444 }
445 
446 Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
447  metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
448  queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
449  readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
450 {
451 }
452 
454 {
455  delete metadataOwner;
456  delete queuesOwner;
457  delete readersOwner;
458 }
459 
bool validProcessId(const int processId) const
Definition: Queue.cc:379
virtual int remotesCount() const
Definition: Queue.cc:429
virtual int remotesIdOffset() const
Definition: Queue.cc:435
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:64
virtual int remotesIdOffset() const
Definition: Queue.cc:330
virtual const QueueReader & localReader() const =0
Definition: Queue.cc:203
static String ReadersId(String id)
constructs QueueReaders ID from parent queue ID
Definition: Queue.cc:37
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const
outgoing queue to a given remote process
Definition: Queue.cc:411
Rate rateLimit
pop()s per second limit if positive
Definition: Queue.h:55
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR")
int const char int
Definition: stub_libmem.cc:75
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:226
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
Definition: Queue.h:136
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:440
const A & max(A const &lhs, A const &rhs)
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:61
const QueueReader & reader(const int processId) const
Definition: Queue.cc:397
#define shm_new(Class)
Definition: Pointer.h:200
std::atomic< int > Rate
pop()s per second
Definition: Queue.h:54
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:359
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
virtual const QueueReader & remoteReader(const int remoteProcessId) const
Definition: Queue.cc:423
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
size_t sharedMemorySize() const
Definition: Queue.cc:127
shared array of QueueReaders
Definition: Queue.h:68
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:344
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:297
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:358
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const
incoming queue from a given remote process
Definition: Queue.cc:288
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:336
int size
Definition: ModDevPoll.cc:77
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:361
virtual const QueueReader & localReader() const
Definition: Queue.cc:311
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:119
virtual const QueueReader & remoteReader(const int processId) const
Definition: Queue.cc:317
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:128
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:133
void append(char const *buf, int len)
Definition: String.cc:161
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:47
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
const int theCapacity
Definition: Queue.h:75
#define true
Definition: GnuRegex.c:234
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:157
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:152
#define shm_old(Class)
Definition: Pointer.h:201
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:240
#define assert(EX)
Definition: assert.h:19
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:299
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:386
static String MetadataId(String id)
constructs Metadata ID from parent queue ID
Definition: Queue.cc:21
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
Definition: Queue.cc:102
static String QueuesId(String id)
constructs one-to-one queues ID from parent queue ID
Definition: Queue.cc:29
const int theCapacity
Definition: Queue.h:157
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:180
Shared metadata for MultiQueue.
Definition: Queue.h:318
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const
incoming queue from a given remote process
Definition: Queue.cc:405
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:298
const int theProcessCount
Definition: Queue.h:323
void statClose(std::ostream &) const
end state reporting started by statOpen()
Definition: Queue.cc:112
QueueReaders(const int aCapacity)
Definition: Queue.cc:55
AtomicSignedMsec Balance
Definition: Queue.h:59
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:282
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const
outgoing queue to a given remote process
Definition: Queue.cc:295
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:221
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:173
Shared metadata for FewToFewBiQueue.
Definition: Queue.h:244
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:141
const unsigned int theMaxItemSize
maximum item size
Definition: Queue.h:135
size_t sharedMemorySize() const
Definition: Queue.cc:62
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
Definition: Queue.cc:159
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:246
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:366
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:302
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:260
virtual const QueueReader & localReader() const
Definition: Queue.cc:417
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:446
#define false
Definition: GnuRegex.c:233
virtual int remotesCount() const
Definition: Queue.cc:323
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:360
shared array of OneToOneUniQueues
Definition: Queue.h:142
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors