TcpLogger.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9#include "squid.h"
10#include "comm.h"
11#include "comm/Connection.h"
12#include "comm/ConnOpener.h"
13#include "comm/Loops.h"
14#include "comm/Write.h"
15#include "fatal.h"
16#include "fde.h"
17#include "globals.h" // for shutting_down
18#include "log/CustomLog.h"
19#include "log/File.h"
20#include "log/TcpLogger.h"
21#include "Parsing.h"
22#include "sbuf/MemBlob.h"
23#include "SquidConfig.h"
24
25// a single I/O buffer should be large enough to store any access.log record
26const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL;
27
28// We need at least two buffers because when we write the first buffer,
29// we have to use the second buffer to accumulate new entries.
31
32#define MY_DEBUG_SECTION 50 /* Log file handling */
33
35
36Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them):
37 AsyncJob("TcpLogger"),
38 dieOnError(dieOnErr),
39 bufferCapacity(bufCap),
40 bufferedSize(0),
41 flushDebt(0),
42 quitOnEmpty(false),
43 reconnectScheduled(false),
44 writeScheduled(false),
45 conn(nullptr),
46 remote(them),
47 connectFailures(0),
48 drops(0)
49{
52 "WARNING: tcp:" << remote << " logger configured buffer " <<
53 "size " << bufferCapacity << " is smaller than the " <<
54 BufferCapacityMin << "-byte" << " minimum. " <<
55 "Using the minimum instead.");
57 }
58}
59
61{
62 // make sure Comm::Write does not have our buffer pointer
63 assert(!writeScheduled);
64}
65
66void
68{
69 doConnect();
70}
71
72bool
74{
75 debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty <<
76 " buffered: " << bufferedSize <<
77 " conn: " << conn << ' ' << connectFailures);
78
79 // we do not quit unless we are told that we may
80 if (!quitOnEmpty)
81 return false;
82
83 /* We were asked to quit after we are done writing buffers. Are we done? */
84
85 // If we have records but are failing to connect, quit. Otherwise, we may
86 // be trying to connect forever due to a [since fixed] misconfiguration!
87 const bool failingToConnect = !conn && connectFailures;
88 if (bufferedSize && !failingToConnect)
89 return false;
90
91 return AsyncJob::doneAll();
92}
93
94void
96{
97 disconnect(); // optional: refcounting should close/delete conn eventually
99}
100
101void
103{
104 // job call protection must end our job if we are done logging current bufs
105 assert(inCall != nullptr);
106 quitOnEmpty = true;
107 flush();
108}
109
110void
112{
113 flushDebt = bufferedSize;
114 writeIfNeeded();
115}
116
117void
118Log::TcpLogger::logRecord(const char *buf, const size_t len)
119{
120 appendRecord(buf, len);
121 writeIfNeeded();
122}
123
125void
127{
128 // write if an earlier flush command forces us to write or
129 // if we have filled at least one I/O buffer
130 if (flushDebt > 0 || buffers.size() > 1)
131 writeIfPossible();
132}
133
136{
137 debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) <<
138 (bufferedSize > 0) << (conn != nullptr) <<
139 (conn != nullptr && !fd_table[conn->fd].closing()) << " buffered: " <<
140 bufferedSize << '/' << buffers.size());
141
142 // XXX: Squid shutdown sequence starts closing our connection before
143 // calling LogfileClose, leading to loss of log records during shutdown.
144 if (!writeScheduled && bufferedSize > 0 && conn != nullptr &&
145 !fd_table[conn->fd].closing()) {
146 debugs(MY_DEBUG_SECTION, 5, "writing first buffer");
147
150 const MemBlob::Pointer &buffer = buffers.front();
151 Comm::Write(conn, buffer->mem, buffer->size, callback, nullptr);
152 writeScheduled = true;
153 }
154}
155
157bool
158Log::TcpLogger::canFit(const size_t len) const
159{
160 // TODO: limit reporting frequency in addition to reporting only changes
161
162 if (bufferedSize+len <= bufferCapacity) {
163 if (drops) {
164 // We can get here if a shorter record accidentally fits after we
165 // started dropping records. When that happens, the following
166 // DBG_IMPORTANT message will mislead admin into thinking that
167 // the problem was resolved (for a brief period of time, until
168 // another record comes in and overflows the buffer). It is
169 // difficult to prevent this without also creating the opposite
170 // problem: A huge record that does not fit and is dropped blocks
171 // subsequent regular records from being buffered until we write.
172 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
173 " logger stops dropping records after " << drops << " drops" <<
174 "; current buffer use: " << (bufferedSize+len) <<
175 " out of " << bufferCapacity << " bytes");
176 }
177 return true;
178 }
179
180 if (!drops || dieOnError) {
182 "ERROR: tcp:" << remote << " logger " << bufferCapacity << "-byte " <<
183 "buffer overflowed; cannot fit " <<
184 (bufferedSize+len-bufferCapacity) << " bytes");
185 }
186
187 if (dieOnError)
188 fatal("tcp logger buffer overflowed");
189
190 if (!drops) {
191 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
192 " logger starts dropping records.");
193 }
194
195 return false;
196}
197
199void
200Log::TcpLogger::appendRecord(const char *record, const size_t len)
201{
202 // they should not happen, but to be safe, let's protect drop start/stop
203 // monitoring algorithm from empty records (which can never be dropped)
204 if (!len)
205 return;
206
207 if (!canFit(len)) {
208 ++drops;
209 return;
210 }
211
212 drops = 0;
213 // append without splitting buf, unless it exceeds IoBufSize
214 for (size_t off = 0; off < len; off += IoBufSize)
215 appendChunk(record + off, min(len - off, IoBufSize));
216}
217
219void
220Log::TcpLogger::appendChunk(const char *chunk, const size_t len)
221{
222 Must(len <= IoBufSize);
223 // add a buffer if there is not one that can accommodate len bytes
224 bool addBuffer = buffers.empty() ||
225 (buffers.back()->size+len > IoBufSize);
226 // also add a buffer if there is only one and that one is being written
227 addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
228
229 if (addBuffer) {
230 buffers.push_back(new MemBlob(IoBufSize));
231 debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size());
232 }
233
234 Must(!buffers.empty());
235 buffers.back()->append(chunk, len);
236 bufferedSize += len;
237}
238
240void
242{
243 if (shutting_down)
244 return;
245
246 debugs(MY_DEBUG_SECTION, 3, "connecting");
247 Must(!conn);
248
250 futureConn->remote = remote;
251 futureConn->local.setAnyAddr();
252 if (futureConn->remote.isIPv4())
253 futureConn->local.setIPv4();
254
257 const auto cs = new Comm::ConnOpener(futureConn, call, 2);
258 connWait.start(cs, call);
259}
260
262void
264{
265 connWait.finish();
266
267 if (params.flag != Comm::OK) {
268 const double delay = 0.5; // seconds
269 if (connectFailures++ % 100 == 0) {
270 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "ERROR: tcp:" << remote <<
271 " logger connection attempt #" << connectFailures <<
272 " failed. Will keep trying every " << delay << " seconds.");
273 }
274
275 if (!reconnectScheduled) {
276 reconnectScheduled = true;
277 eventAdd("Log::TcpLogger::DelayedReconnect",
279 new Pointer(this), 0.5, 0, false);
280 }
281 } else {
282 if (connectFailures > 0) {
283 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
284 " logger connectivity restored after " <<
285 (connectFailures+1) << " attempts.");
286 connectFailures = 0;
287 }
288
289 Must(!conn);
290 conn = params.conn;
291
292 Must(!closer);
295 comm_add_close_handler(conn->fd, closer);
296
297 writeIfNeeded();
298 }
299}
300
301// XXX: Needed until eventAdd() starts accepting Async calls directly.
303void
305{
306 Pointer *ptr = static_cast<Pointer*>(data);
307 assert(ptr);
308 if (TcpLogger *logger = ptr->valid()) {
309 // Get back inside AsyncJob protections by scheduling another call.
310 typedef NullaryMemFunT<TcpLogger> Dialer;
312 logger,
314 ScheduleCallHere(call);
315 }
316 delete ptr;
317}
318
320void
322{
323 Must(reconnectScheduled);
324 Must(!conn);
325 reconnectScheduled = false;
326 doConnect();
327}
328
330void
332{
333 writeScheduled = false;
334 if (io.flag == Comm::ERR_CLOSING) {
335 debugs(MY_DEBUG_SECTION, 7, "closing");
336 // do nothing here -- our comm_close_handler will be called to clean up
337 } else if (io.flag != Comm::OK) {
338 debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno));
339 // keep the first buffer (the one we failed to write)
340 disconnect();
341 doConnect();
342 } else {
343 debugs(MY_DEBUG_SECTION, 5, "write successful");
344
345 Must(!buffers.empty()); // we had a buffer to write
346 const MemBlob::Pointer &written = buffers.front();
347 const size_t writtenSize = static_cast<size_t>(written->size);
348 // and we wrote the whole buffer
349 Must(io.size == writtenSize);
350 Must(bufferedSize >= writtenSize);
351 bufferedSize -= writtenSize;
352
353 buffers.pop_front();
354
355 if (flushDebt > io.size)
356 flushDebt -= io.size;
357 else
358 flushDebt = 0; // wrote everything we owed (or more)
359
360 writeIfNeeded();
361 }
362}
363
366void
368{
369 assert(inCall != nullptr);
370 closer = nullptr;
371 if (conn) {
372 conn->noteClosure();
373 conn = nullptr;
374 }
375 // in all current use cases, we should not try to reconnect
376 mustStop("Log::TcpLogger::handleClosure");
377}
378
380void
382{
383 if (conn != nullptr) {
384 if (closer != nullptr) {
385 comm_remove_close_handler(conn->fd, closer);
386 closer = nullptr;
387 }
388 conn->close();
389 conn = nullptr;
390 }
391}
392
397{
398 if (Pointer *pptr = static_cast<Pointer*>(lf->data))
399 return pptr->get(); // may be nil
400 return nullptr;
401}
402
403void
405{
406 if (TcpLogger *logger = StillLogging(lf))
407 logger->flush();
408}
409
410void
411Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len)
412{
413 if (TcpLogger *logger = StillLogging(lf))
414 logger->logRecord(buf, len);
415}
416
417void
419{
420}
421
422void
424{
426 Flush(lf);
427}
428
429void
431{
432}
433
434void
436{
437 if (TcpLogger *logger = StillLogging(lf)) {
438 debugs(50, 3, "Closing " << logger);
439 typedef NullaryMemFunT<TcpLogger> Dialer;
440 Dialer dialer(logger, &Log::TcpLogger::endGracefully);
441 AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer);
442 ScheduleCallHere(call);
443 }
444 delete static_cast<Pointer*>(lf->data);
445 lf->data = nullptr;
446}
447
448/*
449 * This code expects the path to be //host:port
450 */
451int
452Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag)
453{
454 assert(!StillLogging(lf));
455 debugs(5, 3, "Tcp Open called");
456
457 Ip::Address addr;
458
459 if (strncmp(path, "//", 2) == 0)
460 path += 2;
461 char *strAddr = xstrdup(path);
462 if (!GetHostWithPort(strAddr, &addr)) {
463 if (lf->flags.fatal) {
464 fatalf("Invalid TCP logging address '%s'\n", lf->path);
465 } else {
466 debugs(50, DBG_IMPORTANT, "ERROR: Invalid TCP logging address '" << lf->path << "'");
467 safe_free(strAddr);
468 return FALSE;
469 }
470 }
471 safe_free(strAddr);
472
473 TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr);
474 lf->data = new Pointer(logger);
475 lf->f_close = &Close;
476 lf->f_linewrite = &WriteLine;
477 lf->f_linestart = &StartLine;
478 lf->f_lineend = &EndLine;
479 lf->f_flush = &Flush;
480 lf->f_rotate = &Rotate;
481 AsyncJob::Start(logger);
482
483 return 1;
484}
485
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
Definition: AsyncJobCalls.h:69
bool GetHostWithPort(char *token, Ip::Address *ipa)
Definition: Parsing.cc:257
class SquidConfig Config
Definition: SquidConfig.cc:12
CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger)
#define MY_DEBUG_SECTION
Definition: TcpLogger.cc:32
#define Must(condition)
Definition: TextException.h:75
int conn
the current server connection FD
Definition: Transport.cc:26
#define assert(EX)
Definition: assert.h:17
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
virtual void swanSong()
Definition: AsyncJob.h:61
int xerrno
The last errno to occur. non-zero if flag is Comm::COMM_ERROR.
Definition: CommCalls.h:83
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:82
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
Ip::Address remote
Definition: Connection.h:149
Ip::Address local
Definition: Connection.h:146
bool setIPv4()
Definition: Address.cc:224
bool isIPv4() const
Definition: Address.cc:158
void setAnyAddr()
NOTE: Does NOT clear the Port stored. Only the Address and Type.
Definition: Address.cc:177
void swanSong() override
Definition: TcpLogger.cc:95
void writeDone(const CommIoCbParams &io)
Comm::Write callback.
Definition: TcpLogger.cc:331
void writeIfNeeded()
starts writing if and only if it is time to write accumulated records
Definition: TcpLogger.cc:126
void disconnect()
close our connection now, without flushing
Definition: TcpLogger.cc:381
void logRecord(const char *buf, size_t len)
buffers record and possibly writes it to the remote logger
Definition: TcpLogger.cc:118
void writeIfPossible()
starts writing if possible
Definition: TcpLogger.cc:135
static void EndLine(Logfile *lf)
Definition: TcpLogger.cc:423
bool canFit(const size_t len) const
whether len more bytes can be buffered
Definition: TcpLogger.cc:158
void appendChunk(const char *chunk, const size_t len)
buffer a record chunk without splitting it across buffers
Definition: TcpLogger.cc:220
static void Flush(Logfile *lf)
Definition: TcpLogger.cc:404
void delayedReconnect()
"sleep a little before trying to connect again" event callback
Definition: TcpLogger.cc:321
static void DelayedReconnect(void *data)
Log::TcpLogger::delayedReconnect() wrapper.
Definition: TcpLogger.cc:304
TcpLogger(size_t, bool, Ip::Address)
Definition: TcpLogger.cc:36
~TcpLogger() override
Definition: TcpLogger.cc:60
static TcpLogger * StillLogging(Logfile *lf)
Definition: TcpLogger.cc:396
void handleClosure(const CommCloseCbParams &io)
Definition: TcpLogger.cc:367
Ip::Address remote
where the remote logger expects our records
Definition: TcpLogger.h:109
static void WriteLine(Logfile *lf, const char *buf, size_t len)
Definition: TcpLogger.cc:411
static void Close(Logfile *lf)
Definition: TcpLogger.cc:435
void endGracefully()
Definition: TcpLogger.cc:102
void doConnect()
starts [re]connecting to the remote logger
Definition: TcpLogger.cc:241
static const size_t IoBufSize
fixed I/O buffer size
Definition: TcpLogger.h:91
static const size_t BufferCapacityMin
minimum bufferCapacity value
Definition: TcpLogger.h:92
void connectDone(const CommConnectCbParams &conn)
Comm::ConnOpener callback.
Definition: TcpLogger.cc:263
bool doneAll() const override
whether positive goal has been reached
Definition: TcpLogger.cc:73
void appendRecord(const char *buf, size_t len)
buffer a record that might exceed IoBufSize
Definition: TcpLogger.cc:200
static int Open(Logfile *lf, const char *path, size_t bufSz, int fatalFlag)
Definition: TcpLogger.cc:452
void flush()
write all currently buffered records ASAP
Definition: TcpLogger.cc:111
size_t bufferCapacity
bufferedSize limit
Definition: TcpLogger.h:100
static void Rotate(Logfile *lf, const int16_t)
Definition: TcpLogger.cc:430
static void StartLine(Logfile *lf)
Definition: TcpLogger.cc:418
void start() override
called by AsyncStart; do not call directly
Definition: TcpLogger.cc:67
Definition: File.h:39
LOGLINEEND * f_lineend
Definition: File.h:59
LOGFLUSH * f_flush
Definition: File.h:60
void * data
Definition: File.h:55
LOGCLOSE * f_close
Definition: File.h:62
char path[MAXPATHLEN]
Definition: File.h:46
struct Logfile::@79 flags
LOGLINESTART * f_linestart
Definition: File.h:57
LOGWRITE * f_linewrite
Definition: File.h:58
unsigned int fatal
Definition: File.h:49
LOGROTATE * f_rotate
Definition: File.h:61
char * mem
raw allocated memory block
Definition: MemBlob.h:112
size_type size
maximum allocated memory in use by callers
Definition: MemBlob.h:114
struct SquidConfig::@106 onoff
int buffered_logs
Definition: SquidConfig.h:284
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:949
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:978
A const & min(A const &lhs, A const &rhs)
#define DBG_IMPORTANT
Definition: Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
#define DBG_CRITICAL
Definition: Stream.h:37
#define MAX_URL
Definition: defines.h:78
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
void fatal(const char *message)
Definition: fatal.cc:28
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
#define fd_table
Definition: fde.h:189
int shutting_down
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
@ OK
Definition: Flag.h:16
@ ERR_CLOSING
Definition: Flag.h:24
static CLCB Close
Definition: Ident.cc:73
Definition: Config.h:18
SSL Connection
Definition: Session.h:45
#define xstrdup
#define FALSE
Definition: std-includes.h:56
#define safe_free(x)
Definition: xalloc.h:73
const char * xstrerr(int error)
Definition: xstrerror.cc:83

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors