CollapsedForwarding.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 17 Request Forwarding */
10 
11 #include "squid.h"
12 #include "CollapsedForwarding.h"
13 #include "globals.h"
14 #include "ipc/mem/Segment.h"
15 #include "ipc/Messages.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemObject.h"
19 #include "SquidConfig.h"
20 #include "Store.h"
21 #include "store_key_md5.h"
22 #include "tools.h"
23 
25 static const char *const ShmLabel = "cf";
27 // TODO: make configurable or compute from squid.conf settings if possible
28 static const int QueueCapacity = 1024;
29 
30 std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
31 
34 {
35 public:
37 
38 public:
39  int sender;
40 
43 };
44 
45 // CollapsedForwarding
46 
47 void
49 {
50  Must(!queue.get());
51  if (UsingSmp() && IamWorkerProcess())
52  queue.reset(new Queue(ShmLabel, KidIdentifier));
53 }
54 
55 void
56 CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
57 {
58  if (!e.hasTransients() ||
60  debugs(17, 7, "nobody reads " << e);
61  return;
62  }
63 
64  debugs(17, 5, e);
65  Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
66 }
67 
68 void
69 CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
70 {
71  if (!queue.get())
72  return;
73 
75  msg.sender = KidIdentifier;
76  msg.xitIndex = index;
77 
78  debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
79 
80  // TODO: send only to workers who are waiting for data
81  for (int workerId = 1; workerId <= Config.workers; ++workerId) {
82  try {
83  if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
84  Notify(workerId);
85  } catch (const Queue::Full &) {
86  debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
87  "queue overflow for kid" << workerId <<
88  " at " << queue->outSize(workerId) << " items");
89  // TODO: grow queue size
90  }
91  }
92 }
93 
94 void
95 CollapsedForwarding::Notify(const int workerId)
96 {
97  // TODO: Count and report the total number of notifications, pops, pushes.
98  debugs(17, 7, "to kid" << workerId);
99  Ipc::TypedMsgHdr msg;
101  msg.putInt(KidIdentifier);
103  Ipc::SendMessage(addr, msg);
104 }
105 
106 void
107 CollapsedForwarding::HandleNewData(const char *const when)
108 {
109  debugs(17, 4, "popping all " << when);
111  int workerId;
112  int poppedCount = 0;
113  while (queue->pop(workerId, msg)) {
114  debugs(17, 3, "message from kid" << workerId);
115  if (workerId != msg.sender) {
116  debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
117  " != " << msg.sender);
118  }
119 
120  debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
122  debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
123 
124  // XXX: stop and schedule an async call to continue
125  ++poppedCount;
126  assert(poppedCount < SQUID_MAXFD);
127  }
128 }
129 
130 void
132 {
133  const int from = msg.getInt();
134  debugs(17, 7, "from " << from);
135  assert(queue.get());
136  queue->clearReaderSignal(from);
137  HandleNewData("after notification");
138 }
139 
142 {
143 public:
144  /* RegisteredRunner API */
146  virtual ~CollapsedForwardingRr();
147 
148 protected:
149  virtual void create();
150  virtual void open();
151 
152 private:
154 };
155 
157 
159 {
160  Must(!owner);
162  sizeof(CollapsedForwardingMsg),
163  QueueCapacity);
164 }
165 
167 {
169 }
170 
172 {
173  delete owner;
174 }
175 
#define assert(EX)
Definition: assert.h:17
static const char *const ShmLabel
shared memory segment path to use for CollapsedForwarding queue
initializes shared queue used by CollapsedForwarding
void putInt(int n)
store an integer
Definition: TypedMsgHdr.cc:126
bool hasTransients() const
whether there is a corresponding locked transients table entry
Definition: Store.h:187
#define Must(condition)
Like assert() but throws an exception instead of aborting the process.
Definition: TextException.h:69
static void Notify(const int workerId)
kick worker with empty IPC queue
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
Controller & Root()
safely access controller singleton
Definition: Controller.cc:877
XitTable xitTable
current [shared] memory caching state for the entry
Definition: MemObject.h:140
bool IamWorkerProcess()
whether the current process handles HTTP transactions and such
Definition: stub_tools.cc:49
int32_t index
entry position inside the in-transit table
Definition: MemObject.h:137
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
int transientReaders(const StoreEntry &) const
number of the transient entry readers some time ago
Definition: Controller.cc:613
static const int QueueCapacity
a single worker-to-worker queue capacity
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:124
virtual void create()
called when the runner should create a new memory segment
#define DBG_IMPORTANT
Definition: Debug.h:46
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:51
static void HandleNewData(const char *const when)
handle new data messages in IPC queue
int sender
kid ID of sending process
void * addr
Definition: membanger.c:46
Ipc::MultiQueue::Owner * owner
sfileno xitIndex
transients index, so that workers can find [private] entries to sync
signed_int32_t sfileno
Definition: forward.h:22
int getInt() const
load an integer
Definition: TypedMsgHdr.cc:118
MemObject * mem_obj
Definition: Store.h:197
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
bool UsingSmp()
Whether there should be more than one worker process running.
Definition: tools.cc:658
const char strandAddrLabel[]
strand&#39;s listening address unique label
Definition: Port.cc:22
#define RunnerRegistrationEntry(Who)
convenience macro: register one RegisteredRunner kid as early as possible
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:342
int KidIdentifier
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:31
static void Init()
open shared memory segment
static std::unique_ptr< Queue > queue
IPC queue.
void setType(int aType)
sets message type; use MessageType enum
Definition: TypedMsgHdr.cc:107
class SquidConfig Config
Definition: SquidConfig.cc:12
#define NULL
Definition: types.h:166
void syncCollapsed(const sfileno)
Update local intransit entry after changes made by appending worker.
Definition: Controller.cc:734

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors