CollapsedForwarding.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 17 Request Forwarding */
10
11#include "squid.h"
12#include "base/AsyncFunCalls.h"
13#include "CollapsedForwarding.h"
14#include "globals.h"
15#include "ipc/mem/Segment.h"
16#include "ipc/Messages.h"
17#include "ipc/Port.h"
18#include "ipc/TypedMsgHdr.h"
19#include "MemObject.h"
20#include "SquidConfig.h"
21#include "Store.h"
22#include "store_key_md5.h"
23#include "tools.h"
24
26static const char *const ShmLabel = "cf";
28// TODO: make configurable or compute from squid.conf settings if possible
29static const int QueueCapacity = 1024;
30
31std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
32
35{
36public:
38
40 void stat(std::ostream &);
41
42public:
43 int sender;
44
47};
48
49void
51{
52 os << "sender: " << sender << ", xitIndex: " << xitIndex;
53}
54
55// CollapsedForwarding
56
57void
59{
60 Must(!queue.get());
61 if (UsingSmp() && IamWorkerProcess()) {
62 queue.reset(new Queue(ShmLabel, KidIdentifier));
63 AsyncCall::Pointer callback = asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart",
65 ScheduleCallHere(callback);
66 }
67}
68
69void
70CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
71{
72 if (!e.hasTransients() ||
74 debugs(17, 7, "nobody reads " << e);
75 return;
76 }
77
78 debugs(17, 5, e);
79 Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
80}
81
82void
83CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
84{
85 if (!queue.get())
86 return;
87
90 msg.xitIndex = index;
91
92 debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
93
94 // TODO: send only to workers who are waiting for data
95 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
96 try {
97 if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
98 Notify(workerId);
99 } catch (const Queue::Full &) {
100 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
101 "queue overflow for kid" << workerId <<
102 " at " << queue->outSize(workerId) << " items");
103 // TODO: grow queue size
104 }
105 }
106}
107
108void
110{
111 // TODO: Count and report the total number of notifications, pops, pushes.
112 debugs(17, 7, "to kid" << workerId);
116 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
117 Ipc::SendMessage(addr, msg);
118}
119
120void
122{
123 debugs(17, 4, "popping all " << when);
125 int workerId;
126 int poppedCount = 0;
127 while (queue->pop(workerId, msg)) {
128 debugs(17, 3, "message from kid" << workerId);
129 if (workerId != msg.sender) {
130 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
131 " != " << msg.sender);
132 }
133
134 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
136 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
137
138 // XXX: stop and schedule an async call to continue
139 ++poppedCount;
140 assert(poppedCount < SQUID_MAXFD);
141 }
142}
143
144void
146{
147 const int from = msg.getInt();
148 debugs(17, 7, "from " << from);
149 assert(queue.get());
150 queue->clearReaderSignal(from);
151 HandleNewData("after notification");
152}
153
156void
158{
160 queue->clearAllReaderSignals();
161 HandleNewData("at start");
162}
163
164void
166{
167 if (queue.get()) {
168 os << "Transients queues:\n";
169 queue->stat<CollapsedForwardingMsg>(os);
170 }
171}
172
175{
176public:
177 /* RegisteredRunner API */
179 ~CollapsedForwardingRr() override;
180
181protected:
182 void create() override;
183 void open() override;
184
185private:
187};
188
190
192{
193 Must(!owner);
197}
198
200{
202}
203
205{
206 delete owner;
207}
208
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
static const int QueueCapacity
a single worker-to-worker queue capacity
static const char *const ShmLabel
shared memory segment path to use for CollapsedForwarding queue
DefineRunnerRegistrator(CollapsedForwardingRr)
class SquidConfig Config
Definition: SquidConfig.cc:12
#define Must(condition)
Definition: TextException.h:75
#define assert(EX)
Definition: assert.h:17
int sender
kid ID of sending process
sfileno xitIndex
transients index, so that workers can find [private] entries to sync
void stat(std::ostream &)
prints message parameters; suitable for cache manager reports
initializes shared queue used by CollapsedForwarding
Ipc::MultiQueue::Owner * owner
void create() override
called when the runner should create a new memory segment
static void Notify(const int workerId)
kick worker with empty IPC queue
static void StatQueue(std::ostream &)
prints IPC message queue state; suitable for cache manager reports
static std::unique_ptr< Queue > queue
IPC queue.
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
static void HandleNewData(const char *const when)
handle new data messages in IPC queue
static void Init()
open shared memory segment
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
static void HandleNewDataAtStart()
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:368
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:52
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:35
void putInt(int n)
store an integer
Definition: TypedMsgHdr.cc:119
void setType(int aType)
sets message type; use MessageType enum
Definition: TypedMsgHdr.cc:100
int getInt() const
load an integer
Definition: TypedMsgHdr.cc:111
int32_t index
entry position inside the in-transit table
Definition: MemObject.h:196
XitTable xitTable
current [shared] memory caching state for the entry
Definition: MemObject.h:199
Calls a function without arguments. See also: NullaryMemFunT.
Definition: AsyncFunCalls.h:18
MemObject * mem_obj
Definition: Store.h:221
bool hasTransients() const
whether there is a corresponding locked transients table entry
Definition: Store.h:211
int transientReaders(const StoreEntry &) const
number of the transient entry readers some time ago
Definition: Controller.cc:632
void syncCollapsed(const sfileno)
Update local intransit entry after changes made by appending worker.
Definition: Controller.cc:773
#define DBG_IMPORTANT
Definition: Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
int KidIdentifier
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:23
@ mtCollapsedForwardingNotification
Definition: Messages.h:33
Controller & Root()
safely access controller singleton
Definition: Controller.cc:938
signed_int32_t sfileno
Definition: forward.h:22
bool IamWorkerProcess()
whether the current process handles HTTP transactions and such
Definition: stub_tools.cc:47
bool UsingSmp()
Whether there should be more than one worker process running.
Definition: tools.cc:696

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors