Queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "Debug.h"
14 #include "globals.h"
15 #include "ipc/Queue.h"
16 
17 #include <limits>
18 
20 static String
22 {
23  id.append("__metadata");
24  return id;
25 }
26 
28 static String
30 {
31  id.append("__queues");
32  return id;
33 }
34 
36 static String
38 {
39  id.append("__readers");
40  return id;
41 }
42 
43 /* QueueReader */
44 
46 
47 Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
48  rateLimit(0), balance(0)
49 {
50  debugs(54, 7, HERE << "constructed " << id);
51 }
52 
53 /* QueueReaders */
54 
55 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
56  theReaders(theCapacity)
57 {
58  Must(theCapacity > 0);
59 }
60 
61 size_t
63 {
64  return SharedMemorySize(theCapacity);
65 }
66 
67 size_t
69 {
70  return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71 }
72 
73 // OneToOneUniQueue
74 
75 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
76  theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77  theCapacity(aCapacity)
78 {
79  Must(theMaxItemSize > 0);
80  Must(theCapacity > 0);
81 }
82 
83 int
84 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
85 {
86  assert(maxItemSize > 0);
87  size -= sizeof(OneToOneUniQueue);
88  return size >= 0 ? size / maxItemSize : 0;
89 }
90 
91 int
92 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
93 {
94  assert(size >= 0);
95  return sizeof(OneToOneUniQueue) + maxItemSize * size;
96 }
97 
98 /* OneToOneUniQueues */
99 
100 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
101 {
102  Must(theCapacity > 0);
103  for (int i = 0; i < theCapacity; ++i)
104  new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
105 }
106 
107 size_t
109 {
110  return sizeof(*this) + theCapacity * front().sharedMemorySize();
111 }
112 
113 size_t
114 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
115 {
116  const int queueSize =
117  OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
118  return sizeof(OneToOneUniQueues) + queueSize * capacity;
119 }
120 
121 const Ipc::OneToOneUniQueue &
123 {
124  Must(0 <= index && index < theCapacity);
125  const size_t queueSize = index ? front().sharedMemorySize() : 0;
126  const char *const queue =
127  reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
128  return *reinterpret_cast<const OneToOneUniQueue *>(queue);
129 }
130 
131 // BaseMultiQueue
132 
133 Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
134  theLocalProcessId(aLocalProcessId),
135  theLastPopProcessId(std::numeric_limits<int>::max() - 1)
136 {
137 }
138 
139 void
140 Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
141 {
142  QueueReader &reader = localReader();
143  debugs(54, 7, "reader: " << reader.id);
144 
145  reader.clearSignal();
146 
147  // we got a hint; we could reposition iteration to try popping from the
148  // remoteProcessId queue first; but it does not seem to help much and might
149  // introduce some bias so we do not do that for now:
150  // theLastPopProcessId = remoteProcessId;
151 }
152 
154 Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
155 {
156  const QueueReader &r = remoteReader(remoteProcessId);
157  return r.balance;
158 }
159 
161 Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
162 {
163  const QueueReader &r = remoteReader(remoteProcessId);
164  return r.rateLimit;
165 }
166 
168 Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
169 {
170  const OneToOneUniQueue &queue =
171  const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
172  return const_cast<OneToOneUniQueue &>(queue);
173 }
174 
176 Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
177 {
178  const OneToOneUniQueue &queue =
179  const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
180  return const_cast<OneToOneUniQueue &>(queue);
181 }
182 
185 {
186  const QueueReader &reader =
187  const_cast<const BaseMultiQueue *>(this)->localReader();
188  return const_cast<QueueReader &>(reader);
189 }
190 
192 Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
193 {
194  const QueueReader &reader =
195  const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
196  return const_cast<QueueReader &>(reader);
197 }
198 
199 // FewToFewBiQueue
200 
202 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
203 {
204  return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
205 }
206 
207 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
208  BaseMultiQueue(aLocalProcessId),
209  metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
210  queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
211  readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
212  theLocalGroup(aLocalGroup)
213 {
214  Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
215  Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
216 
217  debugs(54, 7, "queue " << id << " reader: " << localReader().id);
218 }
219 
220 int
221 Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
222 {
223  return capacity * groupASize * groupBSize * 2;
224 }
225 
226 bool
227 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
228 {
229  switch (group) {
230  case groupA:
231  return metadata->theGroupAIdOffset <= processId &&
232  processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
233  case groupB:
234  return metadata->theGroupBIdOffset <= processId &&
235  processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
236  }
237  return false;
238 }
239 
240 int
241 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
242 {
243  Must(fromGroup != toGroup);
244  assert(validProcessId(fromGroup, fromProcessId));
245  assert(validProcessId(toGroup, toProcessId));
246  int index1;
247  int index2;
248  int offset;
249  if (fromGroup == groupA) {
250  index1 = fromProcessId - metadata->theGroupAIdOffset;
251  index2 = toProcessId - metadata->theGroupBIdOffset;
252  offset = 0;
253  } else {
254  index1 = toProcessId - metadata->theGroupAIdOffset;
255  index2 = fromProcessId - metadata->theGroupBIdOffset;
256  offset = metadata->theGroupASize * metadata->theGroupBSize;
257  }
258  const int index = offset + index1 * metadata->theGroupBSize + index2;
259  return index;
260 }
261 
262 const Ipc::OneToOneUniQueue &
263 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
264 {
265  return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
266 }
267 
268 const Ipc::OneToOneUniQueue &
269 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
270 {
271  return oneToOneQueue(remoteGroup(), remoteProcessId,
272  theLocalGroup, theLocalProcessId);
273 }
274 
275 const Ipc::OneToOneUniQueue &
276 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
277 {
278  return oneToOneQueue(theLocalGroup, theLocalProcessId,
279  remoteGroup(), remoteProcessId);
280 }
281 
282 int
283 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
284 {
285  Must(validProcessId(group, processId));
286  return group == groupA ?
287  processId - metadata->theGroupAIdOffset :
288  metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
289 }
290 
291 const Ipc::QueueReader &
293 {
294  return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
295 }
296 
297 const Ipc::QueueReader &
298 Ipc::FewToFewBiQueue::remoteReader(const int processId) const
299 {
300  return readers->theReaders[readerIndex(remoteGroup(), processId)];
301 }
302 
303 int
305 {
306  return theLocalGroup == groupA ? metadata->theGroupBSize :
307  metadata->theGroupASize;
308 }
309 
310 int
312 {
313  return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
314  metadata->theGroupAIdOffset;
315 }
316 
317 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
318  theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
319  theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
320 {
321  Must(theGroupASize > 0);
322  Must(theGroupBSize > 0);
323 }
324 
325 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
326  metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
327  queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
328  readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
329 {
330 }
331 
333 {
334  delete metadataOwner;
335  delete queuesOwner;
336  delete readersOwner;
337 }
338 
339 // MultiQueue
340 
342 Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
343 {
344  return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
345 }
346 
347 Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
348  BaseMultiQueue(localProcessId),
349  metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
350  queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
351  readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
352 {
353  Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
354  Must(readers->theCapacity == metadata->theProcessCount);
355 
356  debugs(54, 7, "queue " << id << " reader: " << localReader().id);
357 }
358 
359 bool
360 Ipc::MultiQueue::validProcessId(const int processId) const
361 {
362  return metadata->theProcessIdOffset <= processId &&
363  processId < metadata->theProcessIdOffset + metadata->theProcessCount;
364 }
365 
366 const Ipc::OneToOneUniQueue &
367 Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
368 {
369  assert(validProcessId(fromProcessId));
370  assert(validProcessId(toProcessId));
371  const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
372  const int toIndex = toProcessId - metadata->theProcessIdOffset;
373  const int index = fromIndex * metadata->theProcessCount + toIndex;
374  return (*queues)[index];
375 }
376 
377 const Ipc::QueueReader &
378 Ipc::MultiQueue::reader(const int processId) const
379 {
380  assert(validProcessId(processId));
381  const int index = processId - metadata->theProcessIdOffset;
382  return readers->theReaders[index];
383 }
384 
385 const Ipc::OneToOneUniQueue &
386 Ipc::MultiQueue::inQueue(const int remoteProcessId) const
387 {
388  return oneToOneQueue(remoteProcessId, theLocalProcessId);
389 }
390 
391 const Ipc::OneToOneUniQueue &
392 Ipc::MultiQueue::outQueue(const int remoteProcessId) const
393 {
394  return oneToOneQueue(theLocalProcessId, remoteProcessId);
395 }
396 
397 const Ipc::QueueReader &
399 {
400  return reader(theLocalProcessId);
401 }
402 
403 const Ipc::QueueReader &
404 Ipc::MultiQueue::remoteReader(const int processId) const
405 {
406  return reader(processId);
407 }
408 
409 int
411 {
412  return metadata->theProcessCount;
413 }
414 
415 int
417 {
418  return metadata->theProcessIdOffset;
419 }
420 
421 Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
422  theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
423 {
424  Must(theProcessCount > 0);
425 }
426 
427 Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
428  metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
429  queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
430  readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
431 {
432 }
433 
435 {
436  delete metadataOwner;
437  delete queuesOwner;
438  delete readersOwner;
439 }
440 
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process ...
Definition: Queue.cc:140
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const
incoming queue from a given remote process
Definition: Queue.cc:386
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:263
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:100
#define assert(EX)
Definition: assert.h:17
const unsigned int theMaxItemSize
maximum item size
Definition: Queue.h:123
std::atomic< int > Rate
pop()s per second
Definition: Queue.h:53
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:241
virtual int remotesIdOffset() const
Definition: Queue.cc:416
#define shm_old(Class)
Definition: Pointer.h:180
QueueReaders(const int aCapacity)
Definition: Queue.cc:55
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:421
int i
Definition: membanger.c:49
virtual const QueueReader & remoteReader(const int processId) const
Definition: Queue.cc:298
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:427
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:367
virtual const QueueReader & remoteReader(const int remoteProcessId) const
Definition: Queue.cc:404
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
Shared metadata for MultiQueue.
Definition: Queue.h:303
bool validProcessId(const int processId) const
Definition: Queue.cc:360
size_t sharedMemorySize() const
Definition: Queue.cc:62
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:60
InstanceIdDefinitions(Ipc::QueueReader,"ipcQR")
A const & max(A const &lhs, A const &rhs)
void append(char const *buf, int len)
Definition: String.cc:161
static String ReadersId(String id)
constructs QueueReaders ID from parent queue ID
Definition: Queue.cc:37
Rate rateLimit
pop()s per second limit if positive
Definition: Queue.h:54
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:317
const int theCapacity
Definition: Queue.h:74
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const
incoming queue from a given remote process
Definition: Queue.cc:269
size_t sharedMemorySize() const
Definition: Queue.cc:108
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:207
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:46
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const
outgoing queue to a given remote process
Definition: Queue.cc:392
#define true
Definition: GnuRegex.c:234
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:344
virtual const QueueReader & localReader() const
Definition: Queue.cc:292
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:202
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:154
shared array of OneToOneUniQueues
Definition: Queue.h:130
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:284
shared array of QueueReaders
Definition: Queue.h:67
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
Definition: Queue.h:124
static String QueuesId(String id)
constructs one-to-one queues ID from parent queue ID
Definition: Queue.cc:29
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const
outgoing queue to a given remote process
Definition: Queue.cc:276
virtual int remotesIdOffset() const
Definition: Queue.cc:311
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:283
std::ostream & HERE(std::ostream &s)
Definition: Debug.h:147
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:282
#define shm_new(Class)
Definition: Pointer.h:179
#define Must(cond)
Definition: TextException.h:89
Shared metadata for FewToFewBiQueue.
Definition: Queue.h:229
bool SIGHDLR int STUB void int
Definition: stub_tools.cc:68
static String MetadataId(String id)
constructs Metadata ID from parent queue ID
Definition: Queue.cc:21
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:345
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:342
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:161
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:347
const int theProcessCount
Definition: Queue.h:308
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:122
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:221
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:133
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:114
virtual const QueueReader & localReader() const
Definition: Queue.cc:398
virtual int remotesCount() const
Definition: Queue.cc:410
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:343
const QueueReader & reader(const int processId) const
Definition: Queue.cc:378
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:227
virtual int remotesCount() const
Definition: Queue.cc:304
const int theCapacity
Definition: Queue.h:145
AtomicSignedMsec Balance
Definition: Queue.h:58
virtual const QueueReader & localReader() const =0
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:325
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:63
int size
Definition: ModDevPoll.cc:77
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:283
#define false
Definition: GnuRegex.c:233

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors