StoreToCommWriter.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 16 Cache Manager API */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/TextException.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Write.h"
17 #include "CommCalls.h"
18 #include "ipc/FdNotes.h"
19 #include "mgr/StoreToCommWriter.h"
20 #include "Store.h"
21 #include "StoreClient.h"
22 
24  AsyncJob("Mgr::StoreToCommWriter"),
25  clientConnection(conn), entry(anEntry), sc(nullptr), writeOffset(0), closer(nullptr)
26 {
27  debugs(16, 6, clientConnection);
28  closer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
31 }
32 
34 {
35  debugs(16, 6, MYNAME);
36  assert(!entry);
37  assert(!sc);
38  close();
39 }
40 
42 void
44 {
45  if (Comm::IsConnOpen(clientConnection)) {
46  if (closer != nullptr) {
47  comm_remove_close_handler(clientConnection->fd, closer);
48  closer = nullptr;
49  }
50  clientConnection->close();
51  }
52 }
53 
54 void
56 {
57  debugs(16, 6, MYNAME);
58  Must(Comm::IsConnOpen(clientConnection));
59  Must(entry != nullptr);
60  AsyncCall::Pointer call = asyncCall(16, 4, "StoreToCommWriter::Abort", cbdataDialer(&StoreToCommWriter::HandleStoreAbort, this));
61  entry->registerAbortCallback(call);
62  sc = storeClientListAdd(entry, this);
63  Must(sc != nullptr);
64 
65  // initiate the receive-from-store, write-to-comm sequence
66  scheduleStoreCopy();
67 }
68 
69 void
71 {
72  debugs(16, 6, MYNAME);
73  Must(entry != nullptr);
74  Must(sc != nullptr);
75  StoreIOBuffer readBuf(sizeof(buffer), writeOffset, buffer);
76  storeClientCopy(sc, entry, readBuf, &NoteStoreCopied, this);
77 }
78 
79 void
81 {
82  Must(data != nullptr);
83  // make sync Store call async to get async call protections and features
84  StoreToCommWriter* writer = static_cast<StoreToCommWriter*>(data);
86  AsyncCall::Pointer call =
87  asyncCall(16, 5, "Mgr::StoreToCommWriter::noteStoreCopied",
88  MyDialer(writer, &StoreToCommWriter::noteStoreCopied, ioBuf));
89  ScheduleCallHere(call);
90 }
91 
92 void
94 {
95  debugs(16, 6, MYNAME);
96  Must(!ioBuf.flags.error);
97  if (ioBuf.length > 0)
98  scheduleCommWrite(ioBuf); // write received action results to client
99  else
100  Must(doneAll()); // otherwise, why would Store call us with no data?
101 }
102 
103 void
105 {
106  debugs(16, 6, MYNAME);
107  Must(Comm::IsConnOpen(clientConnection));
108  Must(ioBuf.data != nullptr);
109  // write filled buffer
111  AsyncCall::Pointer writer =
112  asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
113  MyDialer(this, &StoreToCommWriter::noteCommWrote));
114  Comm::Write(clientConnection, ioBuf.data, ioBuf.length, writer, nullptr);
115 }
116 
117 void
119 {
120  debugs(16, 6, MYNAME);
121  Must(params.flag == Comm::OK);
122  Must(clientConnection != nullptr && params.fd == clientConnection->fd);
123  Must(params.size != 0);
124  writeOffset += params.size;
125  if (!doneAll())
126  scheduleStoreCopy(); // retrieve the next data portion
127 }
128 
129 void
131 {
132  debugs(16, 6, MYNAME);
133  if (clientConnection) {
134  clientConnection->noteClosure();
135  clientConnection = nullptr;
136  }
137  closer = nullptr;
138  mustStop("commClosed");
139 }
140 
141 void
143 {
144  debugs(16, 6, MYNAME);
145  if (entry != nullptr) {
146  if (sc != nullptr) {
147  storeUnregister(sc, entry, this);
148  sc = nullptr;
149  }
150  entry->unregisterAbortCallback("StoreToCommWriter done");
151  entry->unlock("Mgr::StoreToCommWriter::swanSong");
152  entry = nullptr;
153  }
154  close();
155 }
156 
157 bool
159 {
160  return entry &&
161  entry->store_status == STORE_OK && // the action is over
162  writeOffset >= entry->objectLen(); // we wrote all the results
163 }
164 
165 void
167 {
168  if (Comm::IsConnOpen(mgrWriter->clientConnection))
169  mgrWriter->clientConnection->close();
170 }
171 
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:952
Comm::ConnectionPointer clientConnection
HTTP client descriptor.
struct StoreIOBuffer::@130 flags
static void NoteStoreCopied(void *data, StoreIOBuffer ioBuf)
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
StoreToCommWriter(const Comm::ConnectionPointer &conn, StoreEntry *anEntry)
AsyncCall::Pointer closer
comm_close handler
int fd
FD which the call was about. Set by the async call creator.
Definition: CommCalls.h:85
void noteCommWrote(const CommIoCbParams &params)
called by Comm after the action results are written
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
@ OK
Definition: Flag.h:16
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
void noteStoreCopied(StoreIOBuffer ioBuf)
receive some action results from the store
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
void close()
closes the local connection to the HTTP client, if any
void scheduleCommWrite(const StoreIOBuffer &ioBuf)
tell Comm to write action results
unsigned error
Definition: StoreIOBuffer.h:55
#define assert(EX)
Definition: assert.h:17
bool doneAll() const override
whether positive goal has been reached
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:82
static int sc[16]
Definition: smbdes.c:121
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
@ STORE_OK
Definition: enums.h:45
void start() override
called by AsyncStart; do not call directly
void noteCommClosed(const CommCloseCbParams &params)
called by Comm if the client socket got closed
#define Must(condition)
Definition: TextException.h:75
static void HandleStoreAbort(StoreToCommWriter *param)
called by Store if the entry is no longer usable
void scheduleStoreCopy()
request more action results from the store
#define MYNAME
Definition: Stream.h:219
int storeUnregister(store_client *sc, StoreEntry *e, void *data)
void storeClientCopy(store_client *sc, StoreEntry *e, StoreIOBuffer copyInto, STCB *callback, void *data)
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:981
store_client * storeClientListAdd(StoreEntry *e, void *data)

 

Introduction

Documentation

Support

Miscellaneous