TcpLogger.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "comm.h"
11 #include "comm/Connection.h"
12 #include "comm/ConnOpener.h"
13 #include "comm/Loops.h"
14 #include "comm/Write.h"
15 #include "fatal.h"
16 #include "fde.h"
17 #include "globals.h" // for shutting_down
18 #include "log/CustomLog.h"
19 #include "log/File.h"
20 #include "log/TcpLogger.h"
21 #include "Parsing.h"
22 #include "sbuf/MemBlob.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 
26 // a single I/O buffer should be large enough to store any access.log record
27 const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL;
28 
29 // We need at least two buffers because when we write the first buffer,
30 // we have to use the second buffer to accumulate new entries.
32 
33 #define MY_DEBUG_SECTION 50 /* Log file handling */
34 
36 
37 Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them):
38  AsyncJob("TcpLogger"),
39  dieOnError(dieOnErr),
40  bufferCapacity(bufCap),
41  bufferedSize(0),
42  flushDebt(0),
43  quitOnEmpty(false),
44  reconnectScheduled(false),
45  writeScheduled(false),
46  conn(NULL),
47  remote(them),
48  connectFailures(0),
49  drops(0)
50 {
53  "WARNING: tcp:" << remote << " logger configured buffer " <<
54  "size " << bufferCapacity << " is smaller than the " <<
55  BufferCapacityMin << "-byte" << " minimum. " <<
56  "Using the minimum instead.");
58  }
59 }
60 
62 {
63  // make sure Comm::Write does not have our buffer pointer
64  assert(!writeScheduled);
65 }
66 
67 void
69 {
70  doConnect();
71 }
72 
73 bool
75 {
76  debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty <<
77  " buffered: " << bufferedSize <<
78  " conn: " << conn << ' ' << connectFailures);
79 
80  // we do not quit unless we are told that we may
81  if (!quitOnEmpty)
82  return false;
83 
84  /* We were asked to quit after we are done writing buffers. Are we done? */
85 
86  // If we have records but are failing to connect, quit. Otherwise, we may
87  // be trying to connect forever due to a [since fixed] misconfiguration!
88  const bool failingToConnect = !conn && connectFailures;
89  if (bufferedSize && !failingToConnect)
90  return false;
91 
92  return AsyncJob::doneAll();
93 }
94 
95 void
97 {
98  disconnect(); // optional: refcounting should close/delete conn eventually
100 }
101 
102 void
104 {
105  // job call protection must end our job if we are done logging current bufs
106  assert(inCall != NULL);
107  quitOnEmpty = true;
108  flush();
109 }
110 
111 void
113 {
114  flushDebt = bufferedSize;
115  writeIfNeeded();
116 }
117 
118 void
119 Log::TcpLogger::logRecord(const char *buf, const size_t len)
120 {
121  appendRecord(buf, len);
122  writeIfNeeded();
123 }
124 
126 void
128 {
129  // write if an earlier flush command forces us to write or
130  // if we have filled at least one I/O buffer
131  if (flushDebt > 0 || buffers.size() > 1)
132  writeIfPossible();
133 }
134 
137 {
138  debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) <<
139  (bufferedSize > 0) << (conn != NULL) <<
140  (conn != NULL && !fd_table[conn->fd].closing()) << " buffered: " <<
141  bufferedSize << '/' << buffers.size());
142 
143  // XXX: Squid shutdown sequence starts closing our connection before
144  // calling LogfileClose, leading to loss of log records during shutdown.
145  if (!writeScheduled && bufferedSize > 0 && conn != NULL &&
146  !fd_table[conn->fd].closing()) {
147  debugs(MY_DEBUG_SECTION, 5, "writing first buffer");
148 
149  typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer;
151  const MemBlob::Pointer &buffer = buffers.front();
152  Comm::Write(conn, buffer->mem, buffer->size, callback, NULL);
153  writeScheduled = true;
154  }
155 }
156 
158 bool
159 Log::TcpLogger::canFit(const size_t len) const
160 {
161  // TODO: limit reporting frequency in addition to reporting only changes
162 
163  if (bufferedSize+len <= bufferCapacity) {
164  if (drops) {
165  // We can get here if a shorter record accidentally fits after we
166  // started dropping records. When that happens, the following
167  // DBG_IMPORTANT message will mislead admin into thinking that
168  // the problem was resolved (for a brief period of time, until
169  // another record comes in and overflows the buffer). It is
170  // difficult to prevent this without also creating the opposite
171  // problem: A huge record that does not fit and is dropped blocks
172  // subsequent regular records from being buffered until we write.
173  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
174  " logger stops dropping records after " << drops << " drops" <<
175  "; current buffer use: " << (bufferedSize+len) <<
176  " out of " << bufferCapacity << " bytes");
177  }
178  return true;
179  }
180 
181  if (!drops || dieOnError) {
183  dieOnError ? DBG_CRITICAL : DBG_IMPORTANT,
184  "tcp:" << remote << " logger " << bufferCapacity << "-byte " <<
185  "buffer overflowed; cannot fit " <<
186  (bufferedSize+len-bufferCapacity) << " bytes");
187  }
188 
189  if (dieOnError)
190  fatal("tcp logger buffer overflowed");
191 
192  if (!drops) {
193  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
194  " logger starts dropping records.");
195  }
196 
197  return false;
198 }
199 
201 void
202 Log::TcpLogger::appendRecord(const char *record, const size_t len)
203 {
204  // they should not happen, but to be safe, let's protect drop start/stop
205  // monitoring algorithm from empty records (which can never be dropped)
206  if (!len)
207  return;
208 
209  if (!canFit(len)) {
210  ++drops;
211  return;
212  }
213 
214  drops = 0;
215  // append without spliting buf, unless it exceeds IoBufSize
216  for (size_t off = 0; off < len; off += IoBufSize)
217  appendChunk(record + off, min(len - off, IoBufSize));
218 }
219 
221 void
222 Log::TcpLogger::appendChunk(const char *chunk, const size_t len)
223 {
224  Must(len <= IoBufSize);
225  // add a buffer if there is not one that can accomodate len bytes
226  bool addBuffer = buffers.empty() ||
227  (buffers.back()->size+len > IoBufSize);
228  // also add a buffer if there is only one and that one is being written
229  addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
230 
231  if (addBuffer) {
232  buffers.push_back(new MemBlob(IoBufSize));
233  debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size());
234  }
235 
236  Must(!buffers.empty());
237  buffers.back()->append(chunk, len);
238  bufferedSize += len;
239 }
240 
242 void
244 {
245  if (shutting_down)
246  return;
247 
248  debugs(MY_DEBUG_SECTION, 3, "connecting");
249  Must(!conn);
250 
251  Comm::ConnectionPointer futureConn = new Comm::Connection;
252  futureConn->remote = remote;
253  futureConn->local.setAnyAddr();
254  if (futureConn->remote.isIPv4())
255  futureConn->local.setIPv4();
256 
259  AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
260 }
261 
263 void
265 {
266  if (params.flag != Comm::OK) {
267  const double delay = 0.5; // seconds
268  if (connectFailures++ % 100 == 0) {
269  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
270  " logger connection attempt #" << connectFailures <<
271  " failed. Will keep trying every " << delay << " seconds.");
272  }
273 
274  if (!reconnectScheduled) {
275  reconnectScheduled = true;
276  eventAdd("Log::TcpLogger::DelayedReconnect",
278  new Pointer(this), 0.5, 0, false);
279  }
280  } else {
281  if (connectFailures > 0) {
282  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
283  " logger connectivity restored after " <<
284  (connectFailures+1) << " attempts.");
285  connectFailures = 0;
286  }
287 
288  Must(!conn);
289  conn = params.conn;
290 
291  Must(!closer);
293  closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure);
294  comm_add_close_handler(conn->fd, closer);
295 
296  writeIfNeeded();
297  }
298 }
299 
300 // XXX: Needed until eventAdd() starts accepting Async calls directly.
302 void
304 {
305  Pointer *ptr = static_cast<Pointer*>(data);
306  assert(ptr);
307  if (TcpLogger *logger = ptr->valid()) {
308  // Get back inside AsyncJob protections by scheduling another call.
309  typedef NullaryMemFunT<TcpLogger> Dialer;
311  logger,
313  ScheduleCallHere(call);
314  }
315  delete ptr;
316 }
317 
319 void
321 {
322  Must(reconnectScheduled);
323  Must(!conn);
324  reconnectScheduled = false;
325  doConnect();
326 }
327 
329 void
331 {
332  writeScheduled = false;
333  if (io.flag == Comm::ERR_CLOSING) {
334  debugs(MY_DEBUG_SECTION, 7, "closing");
335  // do nothing here -- our comm_close_handler will be called to clean up
336  } else if (io.flag != Comm::OK) {
337  debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno));
338  // keep the first buffer (the one we failed to write)
339  disconnect();
340  doConnect();
341  } else {
342  debugs(MY_DEBUG_SECTION, 5, "write successful");
343 
344  Must(!buffers.empty()); // we had a buffer to write
345  const MemBlob::Pointer &written = buffers.front();
346  const size_t writtenSize = static_cast<size_t>(written->size);
347  // and we wrote the whole buffer
348  Must(io.size == writtenSize);
349  Must(bufferedSize >= writtenSize);
350  bufferedSize -= writtenSize;
351 
352  buffers.pop_front();
353 
354  if (flushDebt > io.size)
355  flushDebt -= io.size;
356  else
357  flushDebt = 0; // wrote everything we owed (or more)
358 
359  writeIfNeeded();
360  }
361 }
362 
365 void
367 {
368  assert(inCall != NULL);
369  closer = NULL;
370  conn = NULL;
371  // in all current use cases, we should not try to reconnect
372  mustStop("Log::TcpLogger::handleClosure");
373 }
374 
376 void
378 {
379  if (conn != NULL) {
380  if (closer != NULL) {
381  comm_remove_close_handler(conn->fd, closer);
382  closer = NULL;
383  }
384  conn->close();
385  conn = NULL;
386  }
387 }
388 
393 {
394  if (Pointer *pptr = static_cast<Pointer*>(lf->data))
395  return pptr->get(); // may be nil
396  return NULL;
397 }
398 
399 void
401 {
402  if (TcpLogger *logger = StillLogging(lf))
403  logger->flush();
404 }
405 
406 void
407 Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len)
408 {
409  if (TcpLogger *logger = StillLogging(lf))
410  logger->logRecord(buf, len);
411 }
412 
413 void
415 {
416 }
417 
418 void
420 {
422  Flush(lf);
423 }
424 
425 void
427 {
428 }
429 
430 void
432 {
433  if (TcpLogger *logger = StillLogging(lf)) {
434  debugs(50, 3, "Closing " << logger);
435  typedef NullaryMemFunT<TcpLogger> Dialer;
436  Dialer dialer(logger, &Log::TcpLogger::endGracefully);
437  AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer);
438  ScheduleCallHere(call);
439  }
440  delete static_cast<Pointer*>(lf->data);
441  lf->data = NULL;
442 }
443 
444 /*
445  * This code expects the path to be //host:port
446  */
447 int
448 Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag)
449 {
450  assert(!StillLogging(lf));
451  debugs(5, 3, "Tcp Open called");
452 
454 
455  if (strncmp(path, "//", 2) == 0)
456  path += 2;
457  char *strAddr = xstrdup(path);
458  if (!GetHostWithPort(strAddr, &addr)) {
459  if (lf->flags.fatal) {
460  fatalf("Invalid TCP logging address '%s'\n", lf->path);
461  } else {
462  debugs(50, DBG_IMPORTANT, "Invalid TCP logging address '" << lf->path << "'");
463  safe_free(strAddr);
464  return FALSE;
465  }
466  }
467  safe_free(strAddr);
468 
469  TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr);
470  lf->data = new Pointer(logger);
471  lf->f_close = &Close;
472  lf->f_linewrite = &WriteLine;
473  lf->f_linestart = &StartLine;
474  lf->f_lineend = &EndLine;
475  lf->f_flush = &Flush;
476  lf->f_rotate = &Rotate;
477  AsyncJob::Start(logger);
478 
479  return 1;
480 }
481 
void connectDone(const CommConnectCbParams &conn)
Comm::ConnOpener callback.
Definition: TcpLogger.cc:264
bool setIPv4()
Definition: Address.cc:217
LOGFLUSH * f_flush
Definition: File.h:60
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:96
static const size_t IoBufSize
fixed I/O buffer size
Definition: TcpLogger.h:85
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
Definition: AsyncJobCalls.h:68
#define fd_table
Definition: fde.h:157
#define assert(EX)
Definition: assert.h:17
void const char HLPCB * callback
Definition: stub_helper.cc:16
LOGWRITE * f_linewrite
Definition: File.h:58
static void Rotate(Logfile *lf, const int16_t)
Definition: TcpLogger.cc:426
int buffered_logs
Definition: SquidConfig.h:280
static TcpLogger * StillLogging(Logfile *lf)
Definition: TcpLogger.cc:392
Definition: File.h:38
unsigned int fatal
Definition: File.h:49
#define xstrdup
#define safe_free(x)
Definition: xalloc.h:73
Definition: Flag.h:16
virtual bool doneAll() const
whether positive goal has been reached
Definition: TcpLogger.cc:74
#define DBG_CRITICAL
Definition: Debug.h:44
size_t bufferCapacity
bufferedSize limit
Definition: TcpLogger.h:94
int conn
the current server connection FD
Definition: Transport.cc:26
hbase_f Log
Definition: StatHist.cc:21
LOGLINEEND * f_lineend
Definition: File.h:59
AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
void fatalf(const char *fmt,...)
Definition: fatal.cc:79
void writeDone(const CommIoCbParams &io)
Comm::Write callback.
Definition: TcpLogger.cc:330
TcpLogger(size_t, bool, Ip::Address)
Definition: TcpLogger.cc:37
static Pointer Start(AsyncJob *job)
starts a freshly created job (i.e., makes the job asynchronous)
Definition: AsyncJob.cc:23
void * data
Definition: File.h:55
static CLCB Close
Definition: Ident.cc:65
int shutting_down
Definition: testAddress.cc:36
char path[MAXPATHLEN]
Definition: File.h:46
Comm::ConnectionPointer conn
Definition: CommCalls.h:85
const char * xstrerr(int error)
Definition: xstrerror.cc:83
void const char HLPCB void * data
Definition: stub_helper.cc:16
static void StartLine(Logfile *lf)
Definition: TcpLogger.cc:414
static int Open(Logfile *lf, const char *path, size_t bufSz, int fatalFlag)
Definition: TcpLogger.cc:448
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Debug.h:123
void writeIfPossible()
starts writing if possible
Definition: TcpLogger.cc:136
#define DBG_IMPORTANT
Definition: Debug.h:45
virtual void swanSong()
Definition: AsyncJob.h:55
void writeIfNeeded()
starts writing if and only if it is time to write accumulated records
Definition: TcpLogger.cc:127
int xerrno
The last errno to occur. non-zero if flag is Comm::COMM_ERROR.
Definition: CommCalls.h:88
virtual void swanSong()
Definition: TcpLogger.cc:96
Cbc * valid() const
was set and is valid
Definition: CbcPointer.h:41
void doConnect()
starts [re]connecting to the remote logger
Definition: TcpLogger.cc:243
void delayedReconnect()
"sleep a little before trying to connect again" event callback
Definition: TcpLogger.cc:320
static void WriteLine(Logfile *lf, const char *buf, size_t len)
Definition: TcpLogger.cc:407
void * addr
Definition: membanger.c:46
void fatal(const char *message)
Definition: fatal.cc:39
void logRecord(const char *buf, size_t len)
buffers record and possibly writes it to the remote logger
Definition: TcpLogger.cc:119
static const size_t BufferCapacityMin
minimum bufferCapacity value
Definition: TcpLogger.h:86
void appendRecord(const char *buf, size_t len)
buffer a record that might exceed IoBufSize
Definition: TcpLogger.cc:202
char * mem
raw allocated memory block
Definition: MemBlob.h:102
bool isIPv4() const
Definition: Address.cc:151
int unsigned int const char *desc STUB void int len
Definition: stub_fd.cc:20
void const char * buf
Definition: stub_helper.cc:16
struct SquidConfig::@112 onoff
#define Must(cond)
Definition: TextException.h:89
Ip::Address local
Definition: Connection.h:135
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:109
#define FALSE
Definition: std-includes.h:56
Ip::Address remote
Definition: Connection.h:138
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
virtual void start()
called by AsyncStart; do not call directly
Definition: TcpLogger.cc:68
LOGLINESTART * f_linestart
Definition: File.h:57
void setAnyAddr()
NOTE: Does NOT clear the Port stored. Ony the Address and Type.
Definition: Address.cc:170
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:87
struct Logfile::@85 flags
void appendChunk(const char *chunk, const size_t len)
buffer a record chunk without splitting it across buffers
Definition: TcpLogger.cc:222
bool GetHostWithPort(char *token, Ip::Address *ipa)
Definition: Parsing.cc:251
void disconnect()
close our connection now, without flushing
Definition: TcpLogger.cc:377
void handleClosure(const CommCloseCbParams &io)
Definition: TcpLogger.cc:366
LOGROTATE * f_rotate
Definition: File.h:61
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:961
#define MAX_URL
Definition: defines.h:118
static void DelayedReconnect(void *data)
Log::TcpLogger::delayedReconnect() wrapper.
Definition: TcpLogger.cc:303
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:988
bool canFit(const size_t len) const
whether len more bytes can be buffered
Definition: TcpLogger.cc:159
CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger)
size_type size
maximum allocated memory in use by callers
Definition: MemBlob.h:104
static void Close(Logfile *lf)
Definition: TcpLogger.cc:431
class SquidConfig Config
Definition: SquidConfig.cc:12
#define NULL
Definition: types.h:166
virtual ~TcpLogger()
Definition: TcpLogger.cc:61
#define MY_DEBUG_SECTION
Definition: TcpLogger.cc:33
A const & min(A const &lhs, A const &rhs)
static void Flush(Logfile *lf)
Definition: TcpLogger.cc:400
Ip::Address remote
where the remote logger expects our records
Definition: TcpLogger.h:103
#define false
Definition: GnuRegex.c:233
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:35
void endGracefully()
Definition: TcpLogger.cc:103
static void EndLine(Logfile *lf)
Definition: TcpLogger.cc:419
LOGCLOSE * f_close
Definition: File.h:62
void flush()
write all currently buffered records ASAP
Definition: TcpLogger.cc:112

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors