PATCH: Rework base beta22 comm_incoming stuff to work better

From: Stewart Forster <slf@dont-contact.us>
Date: Fri, 05 Jun 1998 14:49:13 +1000

Hiya,

        As promised here's the final patch (tested for both poll and select)
against base 1.2beta22 code that makes the comm_incoming stuff work better.
See code for comments.

        It splits HTTP and ICP incoming apart and targets an average of
1 operation per set of FD's. This means that 2/3s of the time we will
have activity on one of these sockets. This has been tested and it works.
System calls have dropped by 33% over similarly loaded caches running the
beta22 base code. Dropped by 40% over beta21 code.

        This patch supercedes and cancels my previous patch.

        Stew.

--- structs.h 1998/06/05 01:46:58 1.1
+++ structs.h 1998/06/05 01:47:55
@@ -1252,7 +1252,8 @@
     int select_loops;
     double cputime;
     struct timeval timestamp;
- StatHist comm_incoming;
+ StatHist comm_icp_incoming;
+ StatHist comm_http_incoming;
 };
 
 struct _tlv {
--- comm.c 1998/06/04 06:06:49 1.1
+++ comm.c 1998/06/05 04:40:43
@@ -113,7 +113,7 @@
 #endif
 
 #if USE_ASYNC_IO
-#define MAX_POLL_TIME 50
+#define MAX_POLL_TIME 10
 #else
 #define MAX_POLL_TIME 1000
 #endif
@@ -133,7 +133,6 @@
 } ConnectStateData;
 
 /* STATIC */
-static int incame = 0;
 static int commBind(int s, struct in_addr, u_short port);
 #if !HAVE_POLL
 static int examine_select(fd_set *, fd_set *);
@@ -141,7 +140,6 @@
 static void checkTimeouts(void);
 static void commSetReuseAddr(int);
 static void commSetNoLinger(int);
-static void comm_incoming(void);
 static void CommWriteStateCallbackAndFree(int fd, int code);
 #ifdef TCP_NODELAY
 static void commSetTcpNoDelay(int);
@@ -169,47 +167,54 @@
  * of incoming ICP, then we need to check these sockets more than
  * if we just have HTTP.
  *
- * The variable 'incoming_interval' determines how many normal I/O
- * events to process before checking incoming sockets again.
- * Note we store the incoming_interval multipled by a factor
- * of 16 (e.g. <<4) to have some pseudo-floating point precision.
+ * The variables 'incoming_icp_interval' and 'incoming_http_interval'
+ * determine how many normal I/O events to process before checking
+ * incoming sockets again. Note we store the incoming_interval
+ * multipled by a factor of (2^INCOMING_FACTOR) to have some
+ * pseudo-floating point precision.
  *
- * The variable 'io_events' counts how many normal I/O events have
- * been processed. When io_events > incoming_interval, its time
- * to check incoming sockets.
+ * The variable 'icp_io_events' and 'http_io_events' counts how many normal
+ * I/O events have been processed since the last check on the incoming
+ * sockets. When io_events > incoming_interval, its time to check incoming
+ * sockets.
  *
  * Every time we check incoming sockets, we count how many new messages
  * or connections were processed. This is used to adjust the
  * incoming_interval for the next iteration. The new incoming_interval
- * is calculated as the average of the current incoming_interval and
- * 32 divided by the number of incoming events just processed. e.g.
+ * is calculated as the current incoming_interval plus what we would
+ * like to see as an average number of events minus the number of
+ * events just processed.
  *
- * 1 1 32
- * incoming_interval = - incoming_interval + - -----------------
- * 2 2 incoming_events
+ * incoming_interval = incoming_interval + 1 - number_of_events_processed
+ *
+ * There are separate incoming_interval counters for both HTTP and ICP events
  *
- * You can see the current value of incoming_interval, as well as
+ * You can see the current values of the incoming_interval's, as well as
  * a histogram of 'incoming_events' by asking the cache manager
  * for 'comm_incoming', e.g.:
  *
  * % ./client mgr:comm_incoming
  *
- * Bugs:
- *
- * - We have 32 as a magic upper limit on incoming_interval.
- * - INCOMING_TOTAL_MAX = INCOMING_ICP_MAX + INCOMING_HTTP_MAX,
- * but this assumes only one ICP socket and one HTTP socket.
- * If there are multiple incoming HTTP sockets, the we could
- * conceivably process more than INCOMING_TOTAL_MAX events
- * in comm_incoming().
+ * Caveats:
  *
- * The 'invert32[]' array is a pre-calculated array of division for 32/i
+ * - We have MAX_INCOMING_INTEGER as a magic upper limit on
+ * incoming_interval for both types of sockets. At the
+ * largest value the cache will effectively be idling.
  *
+ * - The higher the INCOMING_FACTOR, the slower the algorithm will
+ * respond to load spikes/increases/decreases in demand. A value
+ * between 3 and 8 is recommended.
  */
-static int io_events = 0;
-static int incoming_interval = 16 << 4;
-static int invert32[INCOMING_TOTAL_MAX];
-#define commCheckIncoming (++io_events > (incoming_interval>>4))
+
+#define MAX_INCOMING_INTEGER 256
+#define INCOMING_FACTOR 5
+#define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR)
+static int icp_io_events = 0;
+static int http_io_events = 0;
+static int incoming_icp_interval = 16 << INCOMING_FACTOR;
+static int incoming_http_interval = 16 << INCOMING_FACTOR;
+#define commCheckICPIncoming (++icp_io_events > (incoming_icp_interval>>
INCOMING_FACTOR))
+#define commCheckHTTPIncoming (++http_io_events > (incoming_http_interval>>
INCOMING_FACTOR))
 
 static void
 CommWriteStateCallbackAndFree(int fd, int code)
@@ -763,35 +768,21 @@
     return F->defer_check(fd, F->defer_data);
 }
 
-static void
-comm_incoming(void)
-{
- int j;
- incame = 0;
- io_events = 0;
- if (theInIcpConnection > 0) {
- icpHandleUdp(theInIcpConnection, &incame);
- if (theInIcpConnection != theOutIcpConnection)
- icpHandleUdp(theOutIcpConnection, &incame);
- }
- for (j = 0; j < NHttpSockets; j++) {
- if (HttpSockets[j] < 0)
- continue;
- httpAccept(HttpSockets[j], &incame);
- }
- statHistCount(&Counter.comm_incoming, incame);
- if (incame < INCOMING_TOTAL_MAX)
- incoming_interval = (incoming_interval >> 1) + (invert32[incame] << 3);
-}
 
 static int
-fdIsHttpOrIcp(int fd)
+fdIsIcp(int fd)
 {
- int j;
     if (fd == theInIcpConnection)
         return 1;
     if (fd == theOutIcpConnection)
         return 1;
+ return 0;
+}
+
+static int
+fdIsHttp(int fd)
+{
+ int j;
     for (j = 0; j < NHttpSockets; j++) {
         if (fd == HttpSockets[j])
             return 1;
@@ -800,6 +791,115 @@
 }
 
 #if HAVE_POLL
+
+int
+comm_check_incoming_poll_handlers(int nfds, int *fds)
+{
+ int i;
+ int fd;
+ int incame = 0;
+ PF *hdl = NULL;
+ int npfds;
+ struct pollfd pfds[3 + MAXHTTPPORTS];
+
+ for (i = npfds = 0; i < nfds; i++) {
+ int events;
+ fd = fds[i];
+ events = 0;
+ if (fd_table[fd].read_handler)
+ events |= POLLRDNORM;
+ if (fd_table[fd].write_handler)
+ events |= POLLWRNORM;
+ if (events) {
+ pfds[npfds].fd = fd;
+ pfds[npfds].events = events;
+ pfds[npfds].revents = 0;
+ npfds++;
+ }
+ }
+ if (!nfds)
+ return incame;
+#if !ALARM_UPDATES_TIME
+ getCurrentTime();
+#endif
+ if(poll(pfds, npfds, 0) < 1)
+ return incame;
+ for (i = 0; i < npfds; i++) {
+ int revents;
+ if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
+ continue;
+ if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
+ if (hdl = fd_table[fd].read_handler) {
+ fd_table[fd].read_handler = NULL;
+ hdl(fd, &incame);
+ } else
+ debug(5, 1) ("comm_poll_incoming: NULL read handler\n");
+ }
+ if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
+ if (hdl = fd_table[fd].write_handler) {
+ fd_table[fd].write_handler = NULL;
+ hdl(fd, &incame);
+ } else
+ debug(5, 1) ("comm_poll_incoming: NULL write handler\n");
+ }
+ }
+ return incame;
+}
+
+static void
+comm_poll_icp_incoming(void)
+{
+ int nfds = 0;
+ int fds[2];
+ int nevents;
+
+ icp_io_events = 0;
+ if (theInIcpConnection >= 0)
+ fds[nfds++] = theInIcpConnection;
+ if (theInIcpConnection != theOutIcpConnection)
+ if (theOutIcpConnection >= 0)
+ fds[nfds++] = theOutIcpConnection;
+ if(nfds == 0)
+ return;
+ nevents = comm_check_incoming_poll_handlers(nfds, fds);
+ incoming_icp_interval = incoming_icp_interval + 1 - nevents;
+ if (incoming_icp_interval < 0)
+ incoming_icp_interval = 0;
+ if (incoming_icp_interval > MAX_INCOMING_INTERVAL)
+ incoming_icp_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_ICP_MAX)
+ nevents = INCOMING_ICP_MAX;
+ statHistCount(&Counter.comm_icp_incoming, nevents);
+}
+
+static void
+comm_poll_http_incoming(void)
+{
+ int nfds = 0;
+ int fds[MAXHTTPPORTS];
+ int j;
+ int nevents;
+
+ http_io_events = 0;
+ for (j = 0; j < NHttpSockets; j++) {
+ if (HttpSockets[j] < 0)
+ continue;
+ if (commDeferRead(HttpSockets[j]))
+ continue;
+ fds[nfds++] = HttpSockets[j];
+ }
+ nevents = comm_check_incoming_poll_handlers(nfds, fds);
+ incoming_http_interval = incoming_http_interval + 1 - nevents;
+ if (incoming_http_interval < 0)
+ incoming_http_interval = 0;
+ if (incoming_http_interval > MAX_INCOMING_INTERVAL)
+ incoming_http_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_HTTP_MAX)
+ nevents = INCOMING_HTTP_MAX;
+ statHistCount(&Counter.comm_http_incoming, nevents);
+}
+
+
 /* poll all sockets; call handlers for those that are ready. */
 int
 comm_poll(int msec)
@@ -811,8 +911,8 @@
     int maxfd;
     unsigned long nfds;
     int num;
+ int callicp = 0, callhttp = 0;
     static time_t last_timeout = 0;
- static int lastinc = 0;
     double timeout = current_dtime + (msec / 1000.0);
     do {
 #if !ALARM_UPDATES_TIME
@@ -833,7 +933,11 @@
 #if USE_ASYNC_IO
         aioCheckCallbacks();
 #endif
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_poll_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_poll_http_incoming();
+ callicp = callhttp = 0;
         nfds = 0;
         maxfd = Biggest_FD + 1;
         for (i = 0; i < maxfd; i++) {
@@ -884,16 +988,24 @@
             int revents;
             if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
                 continue;
- if (fdIsHttpOrIcp(fd))
+ if (fdIsIcp(fd)) {
+ callicp = 1;
+ continue;
+ }
+ if (fdIsHttp(fd)) {
+ callhttp = 1;
                 continue;
+ }
             if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
                 debug(5, 6) ("comm_poll: FD %d ready for reading\n", fd);
                 if ((hdl = fd_table[fd].read_handler)) {
                     fd_table[fd].read_handler = NULL;
                     hdl(fd, fd_table[fd].read_data);
                 }
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_poll_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_poll_http_incoming();
             }
             if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
                 debug(5, 5) ("comm_poll: FD %d ready for writing\n", fd);
@@ -901,8 +1013,10 @@
                     fd_table[fd].write_handler = NULL;
                     hdl(fd, fd_table[fd].write_data);
                 }
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_poll_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_poll_http_incoming();
             }
             if (revents & POLLNVAL) {
                 close_handler *ch;
@@ -929,8 +1043,11 @@
                 if (F->open != 0)
                     fd_close(fd);
             }
- lastinc = incame;
         }
+ if(callicp)
+ comm_poll_icp_incoming();
+ if(callhttp)
+ comm_poll_http_incoming();
         return COMM_OK;
     } while (timeout > current_dtime);
     debug(5, 8) ("comm_poll: time out: %d.\n", squid_curtime);
@@ -939,6 +1056,114 @@
 
 #else
 
+int
+comm_check_incoming_select_handlers(int nfds, int *fds)
+{
+ int i;
+ int fd;
+ int incame = 0;
+ int maxfd = 0;
+ PF *hdl = NULL;
+ fd_set read_mask;
+ fd_set write_mask;
+
+ FD_ZERO(&read_mask);
+ FD_ZERO(&write_mask);
+ for (i = 0; i < nfds; i++) {
+ fd = fds[i];
+ if (fd_table[fd].read_handler) {
+ FD_SET(fd, &read_mask);
+ if (fd > maxfd)
+ maxfd = fd;
+ }
+ if (fd_table[fd].write_handler) {
+ FD_SET(fd, &write_mask);
+ if (fd > maxfd)
+ maxfd = fd;
+ }
+ }
+ if (maxfd++ == 0)
+ return incame;
+#if !ALARM_UPDATES_TIME
+ getCurrentTime();
+#endif
+ if (select(maxfd, &read_mask, &write_mask, NULL, &zero_tv) < 1)
+ return incame;
+ for (i = 0; i < nfds; i++) {
+ fd = fds[i];
+ if (FD_ISSET(fd, &read_mask)) {
+ if ((hdl = fd_table[fd].read_handler) != NULL) {
+ fd_table[fd].read_handler = NULL;
+ hdl(fd, &incame);
+ } else {
+ debug(5, 1) ("comm_select_incoming: NULL read handler\n");
+ }
+ }
+ if (FD_ISSET(fd, &write_mask)) {
+ if ((hdl = fd_table[fd].write_handler) != NULL) {
+ fd_table[fd].write_handler = NULL;
+ hdl(fd, &incame);
+ } else {
+ debug(5, 1) ("comm_select_incoming: NULL write handler\n");
+ }
+ }
+ }
+ return incame;
+}
+
+static void
+comm_select_icp_incoming(void)
+{
+ int nfds = 0;
+ int fds[2];
+ int nevents;
+
+ icp_io_events = 0;
+ if (theInIcpConnection >= 0)
+ fds[nfds++] = theInIcpConnection;
+ if (theInIcpConnection != theOutIcpConnection)
+ if (theOutIcpConnection >= 0)
+ fds[nfds++] = theOutIcpConnection;
+ if(nfds == 0)
+ return;
+ nevents = comm_check_incoming_select_handlers(nfds, fds);
+ incoming_icp_interval = incoming_icp_interval + 1 - nevents;
+ if (incoming_icp_interval < 0)
+ incoming_icp_interval = 0;
+ if (incoming_icp_interval > MAX_INCOMING_INTERVAL)
+ incoming_icp_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_ICP_MAX)
+ nevents = INCOMING_ICP_MAX;
+ statHistCount(&Counter.comm_icp_incoming, nevents);
+}
+
+static void
+comm_select_http_incoming(void)
+{
+ int nfds = 0;
+ int fds[MAXHTTPPORTS];
+ int j;
+ int nevents;
+
+ http_io_events = 0;
+ for (j = 0; j < NHttpSockets; j++) {
+ if (HttpSockets[j] < 0)
+ continue;
+ if (commDeferRead(HttpSockets[j]))
+ continue;
+ fds[nfds++] = HttpSockets[j];
+ }
+ nevents = comm_check_incoming_select_handlers(nfds, fds);
+ incoming_http_interval = incoming_http_interval + 1 - nevents;
+ if (incoming_http_interval < 0)
+ incoming_http_interval = 0;
+ if (incoming_http_interval > MAX_INCOMING_INTERVAL)
+ incoming_http_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_HTTP_MAX)
+ nevents = INCOMING_HTTP_MAX;
+ statHistCount(&Counter.comm_http_incoming, nevents);
+}
+
 /* Select on all sockets; call handlers for those that are ready. */
 int
 comm_select(int msec)
@@ -951,9 +1176,9 @@
     int maxfd;
     int nfds;
     int num;
+ int callicp = 0, callhttp = 0;
     static time_t last_timeout = 0;
     struct timeval poll_time;
- static int lastinc;
     double timeout = current_dtime + (msec / 1000.0);
 
     do {
@@ -979,7 +1204,11 @@
             else
                 setSocketShutdownLifetimes(1);
         }
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_select_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_select_http_incoming();
+ callicp = callhttp = 0;
         nfds = 0;
         maxfd = Biggest_FD + 1;
         for (i = 0; i < maxfd; i++) {
@@ -1035,8 +1264,14 @@
         for (fd = 0; fd < maxfd; fd++) {
             if (!FD_ISSET(fd, &readfds) && !FD_ISSET(fd, &writefds))
                 continue;
- if (fdIsHttpOrIcp(fd))
+ if (fdIsIcp(fd)) {
+ callicp = 1;
+ continue;
+ }
+ if (fdIsHttp(fd)) {
+ callhttp = 1;
                 continue;
+ }
             if (FD_ISSET(fd, &readfds)) {
                 debug(5, 6) ("comm_select: FD %d ready for reading\n", fd);
                 if (fd_table[fd].read_handler) {
@@ -1044,8 +1279,10 @@
                     fd_table[fd].read_handler = NULL;
                     hdl(fd, fd_table[fd].read_data);
                 }
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_select_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_select_http_incoming();
             }
             if (FD_ISSET(fd, &writefds)) {
                 debug(5, 5) ("comm_select: FD %d ready for writing\n", fd);
@@ -1054,11 +1291,16 @@
                     fd_table[fd].write_handler = NULL;
                     hdl(fd, fd_table[fd].write_data);
                 }
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_select_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_select_http_incoming();
             }
- lastinc = incame;
         }
+ if(callicp)
+ comm_select_icp_incoming();
+ if(callhttp)
+ comm_select_http_incoming();
         return COMM_OK;
     } while (timeout > current_dtime);
     debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime);
@@ -1188,6 +1430,7 @@
 }
 #endif
 
+
 void
 comm_init(void)
 {
@@ -1200,9 +1443,6 @@
     RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
     zero_tv.tv_sec = 0;
     zero_tv.tv_usec = 0;
- invert32[0] = 32;
- for (i = 1; i < INCOMING_TOTAL_MAX; i++)
- invert32[i] = (int) (32.0 / (double) i + 0.5);
     cachemgrRegister("comm_incoming",
         "comm_incoming() stats",
         commIncomingStats, 0);
@@ -1419,10 +1659,22 @@
 commIncomingStats(StoreEntry * sentry)
 {
     StatCounters *f = &Counter;
- storeAppendPrintf(sentry, "Current incoming_interval: %d\n",
- incoming_interval >> 4);
+ storeAppendPrintf(sentry, "Current incoming_icp_interval: %d\n",
+ incoming_icp_interval >> INCOMING_FACTOR);
+ storeAppendPrintf(sentry, "Current incoming_http_interval: %d\n",
+ incoming_http_interval >> INCOMING_FACTOR);
     storeAppendPrintf(sentry, "\n");
- storeAppendPrintf(sentry, "Histogram of number of incoming sockets or\n");
- storeAppendPrintf(sentry, "Messages handled per comm_incoming() call:\n");
- statHistDump(&f->comm_incoming, sentry, statHistIntDumper);
+ storeAppendPrintf(sentry, "Histogram of events per incoming socket
type\n");
+#ifdef HAVE_POLL
+ storeAppendPrintf(sentry, "ICP Messages handled per
comm_poll_icp_incoming() call:\n");
+#else
+ storeAppendPrintf(sentry, "ICP Messages handled per
comm_select_icp_incoming() call:\n");
+#endif
+ statHistDump(&f->comm_icp_incoming, sentry, statHistIntDumper);
+#ifdef HAVE_POLL
+ storeAppendPrintf(sentry, "HTTP Messages handled per
comm_poll_http_incoming() call:\n");
+#else
+ storeAppendPrintf(sentry, "HTTP Messages handled per
comm_select_http_incoming() call:\n");
+#endif
+ statHistDump(&f->comm_http_incoming, sentry, statHistIntDumper);
 }
--- stat.c 1998/06/05 02:26:41 1.1
+++ stat.c 1998/06/05 02:29:01
@@ -883,7 +883,8 @@
      * Cache Digest Stuff
      */
     statHistEnumInit(&C->cd.on_xition_count, CacheDigestHashFuncCount);
- statHistEnumInit(&C->comm_incoming, INCOMING_TOTAL_MAX);
+ statHistEnumInit(&C->comm_icp_incoming, INCOMING_ICP_MAX);
+ statHistEnumInit(&C->comm_http_incoming, INCOMING_HTTP_MAX);
 }
 
 /* add special cases here as they arrive */
@@ -899,7 +900,8 @@
     statHistClean(&C->icp.reply_svc_time);
     statHistClean(&C->dns.svc_time);
     statHistClean(&C->cd.on_xition_count);
- statHistClean(&C->comm_incoming);
+ statHistClean(&C->comm_icp_incoming);
+ statHistClean(&C->comm_http_incoming);
 }
 
 /* add special cases here as they arrive */
@@ -921,7 +923,8 @@
     statHistCopy(&dest->icp.reply_svc_time, &orig->icp.reply_svc_time);
     statHistCopy(&dest->dns.svc_time, &orig->dns.svc_time);
     statHistCopy(&dest->cd.on_xition_count, &orig->cd.on_xition_count);
- statHistCopy(&dest->comm_incoming, &orig->comm_incoming);
+ statHistCopy(&dest->comm_icp_incoming, &orig->comm_icp_incoming);
+ statHistCopy(&dest->comm_http_incoming, &orig->comm_http_incoming);
 }
 
 static void
Received on Tue Jul 29 2003 - 13:15:50 MDT

This archive was generated by hypermail pre-2.1.9 : Tue Dec 09 2003 - 16:11:48 MST