Client-side bandwidth limit (a.k.a., quota or delay pool) implementation. In mobile environments, Squid may need to limit Squid-to-client bandwidth available to individual users, identified by their IP addresses. The IP address pool can be as large as a /10 IPv4 network (4 million unique IP addresses) and even larger in IPv6 environments. On the other hand, the code should support thousands of connections coming from a single IP (e.g., a child proxy). The implementation is based on storing bandwidth-related "bucket" information in the existing "client database" hash (client_db.cc). The old code already assigned each client IP a single ClientInfo object, which satisfies the client-side IP-based bandwidth pooling requirements. The old hash size is increased to support up to 32K concurrent clients if needed. Client-side pools are configured similarly to server-side ones, but there is only one pool class. See client_delay_pools, client_delay_initial_bucket_level, client_delay_parameters, and client_delay_access in squid.conf. The client_delay_access matches the client with delay parameters. It does not pool clients from different IP addresses together. Special care is taken to provide fair distribution of bandwidth among clients sharing the same bucket (i.e., clients coming from the same IP address). Multiple same-IP clients competing for bandwidth are queued using FIFO algorithm. If a bucket becomes empty, the first client among those sharing the bucket is delayed by 1 second before it can attempt to receive more response data from Squid. This delay may need to be lowered in high-bandwidth environments. This feature has been documented at http://wiki.squid-cache.org/Features/ClientBandwidthLimit === modified file 'include/Array.h' --- include/Array.h 2009-07-13 01:20:26 +0000 +++ include/Array.h 2010-10-11 15:47:38 +0000 @@ -80,40 +80,42 @@ public: typedef E value_type; typedef E* pointer; typedef VectorIteratorBase > iterator; typedef VectorIteratorBase const> const_iterator; void *operator new (size_t); void operator delete (void *); Vector(); ~Vector(); Vector(Vector const &); Vector &operator = (Vector const &); void clean(); void reserve (size_t capacity); void push_back (E); Vector &operator += (E item) {push_back(item); return *this;}; void insert (E); + const E &front() const; + E &front(); E &back(); E pop_back(); E shift(); // aka pop_front void prune(E); void preAppend(int app_count); bool empty() const; size_t size() const; iterator begin(); const_iterator begin () const; iterator end(); const_iterator end () const; E& operator [] (unsigned i); const E& operator [] (unsigned i) const; /* Do not change these, until the entry C struct is removed */ size_t capacity; size_t count; E *items; }; @@ -231,40 +233,56 @@ template E Vector::pop_back() { assert (size()); value_type result = items[--count]; items[count] = value_type(); return result; } template E & Vector::back() { assert (size()); return items[size() - 1]; } template +const E & +Vector::front() const +{ + assert (size()); + return items[0]; +} + +template +E & +Vector::front() +{ + assert (size()); + return items[0]; +} + +template void Vector::prune(E item) { unsigned int n = 0; for (unsigned int i = 0; i < count; i++) { if (items[i] != item) { if (i != n) items[n] = items[i]; n++; } } count = n; } /* if you are going to append a known and large number of items, call this first */ template void Vector::preAppend(int app_count) { === modified file 'src/ClientInfo.h' --- src/ClientInfo.h 2010-05-02 19:32:42 +0000 +++ src/ClientInfo.h 2010-10-11 15:47:38 +0000 @@ -1,33 +1,95 @@ #ifndef SQUID__SRC_CLIENTINFO_H #define SQUID__SRC_CLIENTINFO_H #include "ip/Address.h" #include "hash.h" #include "enums.h" #include "typedefs.h" +#include "cbdata.h" +#include "Array.h" + +#if DELAY_POOLS +class CommQuotaQueue; +#endif class ClientInfo { public: hash_link hash; /* must be first */ Ip::Address addr; struct { int result_hist[LOG_TYPE_MAX]; int n_requests; kb_t kbytes_in; kb_t kbytes_out; kb_t hit_kbytes_out; } Http, Icp; struct { time_t time; int n_req; int n_denied; } cutoff; int n_established; /* number of current established connections */ time_t last_seen; +#if DELAY_POOLS + double writeSpeedLimit;/* Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client */ + double prevTime;/* previous time when we checked */ + double bucketSize; ///< how much can be written now + double bucketSizeLimit; ///< maximum bucket size + bool writeLimitingActive; /* Is write limiter active */ + bool firstTimeConnection;/* is this first time connection for this client */ + + CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota + int rationedQuota; ///< precomputed quota preserving fairness among clients + int rationedCount; ///< number of clients that will receive rationedQuota + bool selectWaiting; ///< is between commSetSelect and commHandleWrite + bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire + + // all those functions access Comm fd_table and are defined in comm.cc + bool hasQueue() const; ///< whether any clients are waiting for write quota + bool hasQueue(const CommQuotaQueue*) const; ///< has a given queue + unsigned int quotaEnqueue(int fd); ///< client starts waiting in queue; create the queue if necessary + int quotaPeekFd() const; ///< retuns the next fd reservation + unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop + void quotaDequeue(); ///< pops queue head from queue + void kickQuotaQueue(); ///< schedule commHandleWriteHelper call + int quotaForDequed(); ///< allocate quota for a just dequeued client + void refillBucket(); ///< adds bytes to bucket based on rate and time + + void quotaDumpQueue(); ///< dumps quota queue for debugging +#endif +}; + +#if DELAY_POOLS +// a queue of Comm clients waiting for I/O quota controlled by delay pools +class CommQuotaQueue +{ +public: + CommQuotaQueue(ClientInfo *info); + ~CommQuotaQueue(); + + bool empty() const { return fds.empty(); } + size_t size() const { return fds.size(); } + int front() const { return fds.front(); } + unsigned int enqueue(int fd); + void dequeue(); + + ClientInfo *clientInfo; ///< bucket responsible for quota maintenance + + // these counters might overflow; that is OK because they are for IDs only + int ins; ///< number of enqueue calls, used to generate a "reservation" ID + int outs; ///< number of dequeue calls, used to check the "reservation" ID + +private: + // TODO: optimize using a Ring- or List-based store? + typedef Vector Store; + Store fds; ///< descriptor queue + + CBDATA_CLASS2(CommQuotaQueue); }; +#endif /* DELAY_POOLS */ #endif === modified file 'src/Makefile.am' --- src/Makefile.am 2010-10-05 11:34:01 +0000 +++ src/Makefile.am 2010-10-11 15:47:38 +0000 @@ -58,41 +58,45 @@ delay_pools.cc \ DelayId.cc \ DelayId.h \ DelayIdComposite.h \ DelayBucket.cc \ DelayBucket.h \ DelayConfig.cc \ DelayConfig.h \ DelayPool.cc \ DelayPool.h \ DelayPools.h \ DelaySpec.cc \ DelaySpec.h \ DelayTagged.cc \ DelayTagged.h \ DelayUser.cc \ DelayUser.h \ DelayVector.cc \ DelayVector.h \ NullDelayId.cc \ - NullDelayId.h + NullDelayId.h \ + \ + ClientDelayConfig.cc \ + ClientDelayConfig.h + if USE_DELAY_POOLS DELAY_POOL_SOURCE = $(DELAY_POOL_ALL_SOURCE) else DELAY_POOL_SOURCE = endif if ENABLE_XPROF_STATS XPROF_STATS_SOURCE = ProfStats.cc else XPROF_STATS_SOURCE = endif if ENABLE_HTCP HTCPSOURCE = htcp.cc htcp.h endif if MAKE_LEAKFINDER LEAKFINDERSOURCE = LeakFinder.cc else LEAKFINDERSOURCE = === modified file 'src/cache_cf.cc' --- src/cache_cf.cc 2010-10-06 13:03:11 +0000 +++ src/cache_cf.cc 2010-10-11 15:51:06 +0000 @@ -1465,40 +1465,82 @@ static void parse_delay_pool_class(DelayConfig * cfg) { cfg->parsePoolClass(); } static void parse_delay_pool_rates(DelayConfig * cfg) { cfg->parsePoolRates(); } static void parse_delay_pool_access(DelayConfig * cfg) { cfg->parsePoolAccess(LegacyParser); } #endif +#if DELAY_POOLS +#include "ClientDelayConfig.h" +/* do nothing - free_client_delay_pool_count is the magic free function. + * this is why client_delay_pool_count isn't just marked TYPE: ushort + */ + +#define free_client_delay_pool_access(X) +#define free_client_delay_pool_rates(X) +#define dump_client_delay_pool_access(X, Y, Z) +#define dump_client_delay_pool_rates(X, Y, Z) + +static void +free_client_delay_pool_count(ClientDelayConfig * cfg) +{ + cfg->freePoolCount(); +} + +static void +dump_client_delay_pool_count(StoreEntry * entry, const char *name, ClientDelayConfig &cfg) +{ + cfg.dumpPoolCount (entry, name); +} + +static void +parse_client_delay_pool_count(ClientDelayConfig * cfg) +{ + cfg->parsePoolCount(); +} + +static void +parse_client_delay_pool_rates(ClientDelayConfig * cfg) +{ + cfg->parsePoolRates(); +} + +static void +parse_client_delay_pool_access(ClientDelayConfig * cfg) +{ + cfg->parsePoolAccess(LegacyParser); +} +#endif + #if USE_HTTP_VIOLATIONS static void dump_http_header_access(StoreEntry * entry, const char *name, header_mangler header[]) { int i; for (i = 0; i < HDR_ENUM_END; i++) { if (header[i].access_list != NULL) { storeAppendPrintf(entry, "%s ", name); dump_acl_access(entry, httpHeaderNameById(i), header[i].access_list); } } } static void parse_http_header_access(header_mangler header[]) { int id, i; char *t = NULL; === modified file 'src/cf.data.depend' --- src/cf.data.depend 2010-10-06 03:50:45 +0000 +++ src/cf.data.depend 2010-10-11 15:47:38 +0000 @@ -2,40 +2,43 @@ access_log acl logformat acl external_acl_type auth_param acl_access acl acl_address acl acl_b_size_t acl acl_tos acl acl_nfmark acl address authparam b_int64_t b_size_t cachedir cache_replacement_policy cachemgrpasswd ConfigAclTos CpuAffinityMap debug delay_pool_access acl delay_class delay_pool_class delay_pools delay_pool_count delay_pool_rates delay_class +client_delay_pool_access acl +client_delay_pool_count +client_delay_pool_rates denyinfo acl eol externalAclHelper auth_param HelperChildConfig hostdomain cache_peer hostdomaintype cache_peer http_header_access acl http_header_replace http_port_list https_port_list adaptation_access_type adaptation_service_set adaptation_service_chain acl icap_service icap_class adaptation_service_set_type icap_service ecap_service adaptation_service_chain_type icap_service ecap_service icap_access_type icap_class acl icap_class_type icap_service icap_service_type icap_service_failure_limit ecap_service_type int kb_int64_t === modified file 'src/cf.data.pre' --- src/cf.data.pre 2010-10-06 03:50:45 +0000 +++ src/cf.data.pre 2010-10-11 15:47:38 +0000 @@ -4778,40 +4778,135 @@ be limited to 128Kb no matter how many workstations they are logged into.: delay_parameters 4 32000/32000 8000/8000 600/64000 16000/16000 DOC_END NAME: delay_initial_bucket_level COMMENT: (percent, 0-100) TYPE: ushort DEFAULT: 50 IFDEF: DELAY_POOLS LOC: Config.Delay.initial DOC_START The initial bucket percentage is used to determine how much is put in each bucket when squid starts, is reconfigured, or first notices a host accessing it (in class 2 and class 3, individual hosts and networks only have buckets associated with them once they have been "seen" by squid). DOC_END COMMENT_START + CLIENT DELAY POOL PARAMETERS + ----------------------------------------------------------------------------- +COMMENT_END + +NAME: client_delay_pools +TYPE: client_delay_pool_count +DEFAULT: 0 +IFDEF: DELAY_POOLS +LOC: Config.ClientDelay +DOC_START + This option specifies the number of client delay pools used. It must + preceed other client_delay_* options. + +Example: + client_delay_pools 2 +DOC_END + +NAME: client_delay_initial_bucket_level +COMMENT: (percent, 0-no_limit) +TYPE: ushort +DEFAULT: 50 +IFDEF: DELAY_POOLS +LOC: Config.ClientDelay.initial +DOC_START + This option determines the initial bucket size as a percentage of + max_bucket_size from client_delay_parameters. Buckets are created + at the time of the "first" connection from the matching IP. Idle + buckets are periodically deleted up. + + You can specify more than 100 percent but note that such "oversized" + buckets are not refilled until their size goes down to max_bucket_size + from client_delay_parameters. + +Example: + client_delay_initial_bucket_level 50 +DOC_END + +NAME: client_delay_parameters +TYPE: client_delay_pool_rates +DEFAULT: none +IFDEF: DELAY_POOLS +LOC: Config.ClientDelay +DOC_START + + This option configures client-side bandwidth limits using the + following format: + + client_delay_parameters pool speed_limit max_bucket_size + + pool is an integer ID used for client_delay_access matching. + + speed_limit is bytes added to the bucket per second. + + max_bucket_size is the maximum size of a bucket, enforced after any + speed_limit additions. + + Please see the delay_parameters option for more information and + examples. + +Example: + client_delay_parameters 1 1024 2048 + client_delay_parameters 2 51200 16384 +DOC_END + +NAME: client_delay_access +TYPE: client_delay_pool_access +DEFAULT: none +IFDEF: DELAY_POOLS +LOC: Config.ClientDelay +DOC_START + + This option determines the client-side delay pool for the + request: + + client_delay_access pool_ID allow|deny acl_name + + All client_delay_access options are checked in their pool ID + order, starting with pool 1. The first checked pool with allowed + request is selected for the request. If no ACL matches or there + are no client_delay_access options, the request bandwidth is not + limited. + + The ACL-selected pool is then used to find the + client_delay_parameters for the request. Client-side pools are + not used to aggregate clients. Clients are always aggregated + based on their source IP addresses (one bucket per source IP). + + Please see delay_access for more examples. + +Example: + client_delay_access 1 allow low_rate_network + client_delay_access 2 allow vips_network +DOC_END + +COMMENT_START WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS ----------------------------------------------------------------------------- COMMENT_END NAME: wccp_router TYPE: address LOC: Config.Wccp.router DEFAULT: any_addr IFDEF: USE_WCCP DOC_START Use this option to define your WCCP ``home'' router for Squid. wccp_router supports a single WCCP(v1) router wccp2_router supports multiple WCCPv2 routers only one of the two may be used at the same time and defines which version of WCCP to use. DOC_END === modified file 'src/client_db.cc' --- src/client_db.cc 2010-07-25 08:10:12 +0000 +++ src/client_db.cc 2010-10-11 16:04:11 +0000 @@ -32,86 +32,169 @@ * */ #include "squid.h" #include "event.h" #include "CacheManager.h" #include "ClientInfo.h" #include "ip/Address.h" #include "SquidMath.h" #include "SquidTime.h" #include "Store.h" static hash_table *client_table = NULL; static ClientInfo *clientdbAdd(const Ip::Address &addr); static FREE clientdbFreeItem; static void clientdbStartGC(void); static void clientdbScheduledGC(void *); +#if DELAY_POOLS +static int max_clients = 32768; +#else static int max_clients = 32; +#endif + static int cleanup_running = 0; static int cleanup_scheduled = 0; static int cleanup_removed; +#if DELAY_POOLS +#define CLIENT_DB_HASH_SIZE 65357 +#else #define CLIENT_DB_HASH_SIZE 467 +#endif static ClientInfo * clientdbAdd(const Ip::Address &addr) { ClientInfo *c; char *buf = new char[MAX_IPSTRLEN]; c = (ClientInfo *)memAllocate(MEM_CLIENT_INFO); c->hash.key = addr.NtoA(buf,MAX_IPSTRLEN); c->addr = addr; +#if DELAY_POOLS + /* setup default values for client write limiter */ + c->writeLimitingActive=false; + c->writeSpeedLimit=0; + c->bucketSize = 0; + c->firstTimeConnection=true; + c->quotaQueue = NULL; + c->rationedQuota = 0; + c->rationedCount = 0; + c->selectWaiting = false; + c->eventWaiting = false; + + /* get current time */ + getCurrentTime(); + c->prevTime=current_dtime;/* put current time to have something sensible here */ +#endif hash_join(client_table, &c->hash); statCounter.client_http.clients++; if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) { cleanup_scheduled++; eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 90, 0); } return c; } +#if DELAY_POOLS +/* Configure client write limiting for this client(note:"client" here means - IP) + info must be got using clientdbGetInfo (so we avoid another hash lookup + writeSpeedLimit is speed limit configured in config for this pool + initialBurst is initial bucket size to use for this client(i.e. client can burst at first) + (it's current configured by http_accept in client_side.cc using writeSpeedLimit and configured value(50% by default) + highWatermark is maximum bucket value + */ +void clientdbSetWriteLimiter(ClientInfo * info, const int writeSpeedLimit,const double initialBurst,const double highWatermark) +{ + assert(info); + debugs(33,5, HERE << "Write limits for " << (const char*)info->hash.key << + " speed=" << writeSpeedLimit << " burst=" << initialBurst << + " highwatermark=" << highWatermark); + + // set or possibly update traffic shaping parameters + info->writeLimitingActive = true; + info->writeSpeedLimit = writeSpeedLimit; + info->bucketSizeLimit = highWatermark; + + // but some members should only be set once for a newly activated bucket + if (info->firstTimeConnection) { + info->firstTimeConnection = false; + + assert(!info->selectWaiting); + assert(!info->quotaQueue); + info->quotaQueue = new CommQuotaQueue(info); + cbdataReference(info->quotaQueue); + + info->bucketSize = initialBurst; + info->prevTime = current_dtime; + } +} +#endif + static void clientdbRegisterWithCacheManager(void) { CacheManager::GetInstance()-> registerAction("client_list", "Cache Client List", clientdbDump, 0, 1); } void clientdbInit(void) { clientdbRegisterWithCacheManager(); if (client_table) return; client_table = hash_create((HASHCMP *) strcmp, CLIENT_DB_HASH_SIZE, hash_string); - } +#if DELAY_POOLS +/* returns ClientInfo for given IP addr + Returns NULL if no such client (or clientdb turned off) + (it is assumed that clientdbEstablished will be called before and create client record if needed) +*/ +ClientInfo * clientdbGetInfo(const Ip::Address &addr) +{ + char key[MAX_IPSTRLEN]; + ClientInfo *c; + + if (!Config.onoff.client_db) + return NULL; + + addr.NtoA(key,MAX_IPSTRLEN); + + c = (ClientInfo *) hash_lookup(client_table, key); + if (c==NULL) + { + debugs(77,1,"Client db does not contain information for given IP address "<<(const char*)key); + return NULL; + } + return c; +} +#endif void clientdbUpdate(const Ip::Address &addr, log_type ltype, protocol_t p, size_t size) { char key[MAX_IPSTRLEN]; ClientInfo *c; if (!Config.onoff.client_db) return; addr.NtoA(key,MAX_IPSTRLEN); c = (ClientInfo *) hash_lookup(client_table, key); if (c == NULL) c = clientdbAdd(addr); if (c == NULL) debug_trap("clientdbUpdate: Failed to add entry"); if (p == PROTO_HTTP) { @@ -282,40 +365,49 @@ log_tags[l], c->Http.result_hist[l], Math::intPercent(c->Http.result_hist[l], c->Http.n_requests)); } storeAppendPrintf(sentry, "\n"); } storeAppendPrintf(sentry, "TOTALS\n"); storeAppendPrintf(sentry, "ICP : %d Queries, %d Hits (%3d%%)\n", icp_total, icp_hits, Math::intPercent(icp_hits, icp_total)); storeAppendPrintf(sentry, "HTTP: %d Requests, %d Hits (%3d%%)\n", http_total, http_hits, Math::intPercent(http_hits, http_total)); } static void clientdbFreeItem(void *data) { ClientInfo *c = (ClientInfo *)data; safe_free(c->hash.key); + +#if DELAY_POOLS + if (CommQuotaQueue *q = c->quotaQueue) { + q->clientInfo = NULL; + delete q; // invalidates cbdata, cancelling any pending kicks + cbdataReferenceDone(q); + } +#endif + memFree(c, MEM_CLIENT_INFO); } void clientdbFreeMemory(void) { hashFreeItems(client_table, clientdbFreeItem); hashFreeMemory(client_table); client_table = NULL; } static void clientdbScheduledGC(void *unused) { cleanup_scheduled = 0; clientdbStartGC(); } static void clientdbGC(void *unused) === modified file 'src/client_side.cc' --- src/client_side.cc 2010-10-01 00:41:19 +0000 +++ src/client_side.cc 2010-10-11 15:47:38 +0000 @@ -94,40 +94,44 @@ #include "comm.h" #include "comm/ListenStateData.h" #include "base/TextException.h" #include "ConnectionDetail.h" #include "eui/Config.h" #include "fde.h" #include "HttpHdrContRange.h" #include "HttpReply.h" #include "HttpRequest.h" #include "ident/Config.h" #include "ident/Ident.h" #include "ip/Intercept.h" #include "ipc/StartListening.h" #include "MemBuf.h" #include "MemObject.h" #include "ProtoPort.h" #include "rfc1738.h" #include "SquidTime.h" #include "Store.h" +#if DELAY_POOLS +#include "ClientInfo.h" +#endif + #if LINGERING_CLOSE #define comm_close comm_lingering_close #endif /// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb { public: typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg); ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg): handler(aHandler), portCfg(aPortCfg) {} virtual void print(std::ostream &os) const { startPrint(os) << ", port=" << (void*)portCfg << ')'; } virtual bool canDial(AsyncCall &) const { return true; } virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); } @@ -3125,40 +3129,82 @@ #endif #if USE_SQUID_EUI if (Eui::TheConfig.euiLookup) { if (details->peer.IsIPv4()) { connState->peer_eui48.lookup(details->peer); } else if (details->peer.IsIPv6()) { connState->peer_eui64.lookup(details->peer); } } #endif if (s->tcp_keepalive.enabled) { commSetTcpKeepalive(newfd, s->tcp_keepalive.idle, s->tcp_keepalive.interval, s->tcp_keepalive.timeout); } connState->readSomeData(); clientdbEstablished(details->peer, 1); +#if DELAY_POOLS + fd_table[newfd].clientInfo = NULL; + + ClientDelayPools & pools(Config.ClientDelay.pools); + if (Config.onoff.client_db) + { + /* it was said several times that client write limiter does not work if client_db is disabled */ + + for (unsigned int pool = 0; pool < pools.size(); pool++) { + + /* pools require explicit 'allow' to assign a client into them */ + if (!pools[pool].access) + continue; // warned in ClientDelayConfig::Finalize() + + ACLFilledChecklist ch(pools[pool].access, NULL, NULL); + + // TODO: we check early to limit error response bandwith but we + // should recheck when we can honor delay_pool_uses_indirect + + ch.src_addr = details->peer; + ch.my_addr = details->me; + + if (ch.fastCheck()) { + + /* request client information from db after we did all checks + this will save hash lookup if client failed checks */ + ClientInfo * cli = clientdbGetInfo(details->peer); + assert(cli); + + /* put client info in FDE */ + fd_table[newfd].clientInfo = cli; + + /* setup write limiter for this request */ + const double burst = floor(0.5 + + (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0); + clientdbSetWriteLimiter(cli, pools[pool].rate, + burst, pools[pool].highwatermark); + break; + } + } + } +#endif incoming_sockets_accepted++; } #if USE_SSL /** Create SSL connection structure and update fd_table */ static SSL * httpsCreate(int newfd, ConnectionDetail *details, SSL_CTX *sslContext) { SSL *ssl = SSL_new(sslContext); if (!ssl) { const int ssl_error = ERR_get_error(); debugs(83, 1, "httpsAccept: Error allocating handle: " << ERR_error_string(ssl_error, NULL) ); comm_close(newfd); return NULL; } SSL_set_fd(ssl, newfd); fd_table[newfd].ssl = ssl; === modified file 'src/comm.cc' --- src/comm.cc 2010-10-06 03:50:45 +0000 +++ src/comm.cc 2010-10-13 17:45:11 +0000 @@ -36,74 +36,87 @@ #include "StoreIOBuffer.h" #include "comm.h" #include "event.h" #include "fde.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" #include "comm/ListenStateData.h" #include "CommIO.h" #include "CommRead.h" #include "ConnectionDetail.h" #include "MemBuf.h" #include "pconn.h" #include "SquidTime.h" #include "CommCalls.h" #include "DescriptorSet.h" #include "icmp/net_db.h" #include "ip/Address.h" #include "ip/Intercept.h" #include "ip/QosConfig.h" #include "ip/tools.h" +#include "ClientInfo.h" +#include "cbdata.h" #if defined(_SQUID_CYGWIN_) #include #endif #ifdef HAVE_NETINET_TCP_H #include #endif /* * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything. */ typedef enum { IOCB_NONE, IOCB_READ, IOCB_WRITE } iocb_type; static void commStopHalfClosedMonitor(int fd); static IOCB commHalfClosedReader; static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI); static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI); +#if DELAY_POOLS +CBDATA_CLASS_INIT(CommQuotaQueue); + +static void commHandleWriteHelper(void * data); +#endif + +static void commSelectOrQueueWrite(const int fd); struct comm_io_callback_t { iocb_type type; int fd; AsyncCall::Pointer callback; char *buf; FREE *freefunc; int size; int offset; comm_err_t errcode; int xerrno; +#if DELAY_POOLS + unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue +#endif + bool active() const { return callback != NULL; } }; struct _comm_fd { int fd; comm_io_callback_t readcb; comm_io_callback_t writecb; }; typedef struct _comm_fd comm_fd_t; comm_fd_t *commfd_table; // TODO: make this a comm_io_callback_t method? bool commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb) { assert(ccb->fd == fd); assert(ccb->type == type); return ccb->active(); } @@ -124,45 +137,50 @@ AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size) { assert(!ccb->active()); assert(ccb->type == type); assert(cb != NULL); ccb->fd = fd; ccb->callback = cb; ccb->buf = buf; ccb->freefunc = freefunc; ccb->size = size; ccb->offset = 0; } // Schedule the callback call and clear the callback static void commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno) { debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" << code << ", " << xerrno << ")"); + assert(ccb->active()); assert(ccb->fd == fd); ccb->errcode = code; ccb->xerrno = xerrno; +#if DELAY_POOLS + ccb->quotaQueueReserv = 0; +#endif + comm_io_callback_t cb = *ccb; /* We've got a copy; blow away the real one */ /* XXX duplicate code from commio_cancel_callback! */ ccb->xerrno = 0; ccb->callback = NULL; // cb has it /* free data */ if (cb.freefunc) { cb.freefunc(cb.buf); cb.buf = NULL; } if (cb.callback != NULL) { typedef CommIoCbParams Params; Params ¶ms = GetCommParams(cb.callback); params.fd = cb.fd; params.buf = cb.buf; params.size = cb.offset; params.flag = cb.errcode; @@ -170,40 +188,44 @@ ScheduleCallHere(cb.callback); } } /* * Cancel the given callback * * Remember that the data is cbdataRef'ed. */ // TODO: make this a comm_io_callback_t method static void commio_cancel_callback(int fd, comm_io_callback_t *ccb) { debugs(5, 3, "commio_cancel_callback: called for FD " << fd); assert(ccb->fd == fd); assert(ccb->active()); ccb->xerrno = 0; ccb->callback = NULL; + +#if DELAY_POOLS + ccb->quotaQueueReserv = 0; +#endif } /* * Call the given comm callback; assumes the callback is valid. * * @param ccb io completion callback */ void commio_call_callback(comm_io_callback_t *ccb) { } class ConnectStateData { public: void *operator new (size_t); void operator delete (void *); static void Connect (int fd, void *me); void connect(); @@ -1572,40 +1594,50 @@ typedef CommCloseCbParams Params; Params &startParams = GetCommParams(startCall); startParams.fd = fd; ScheduleCallHere(startCall); // a half-closed fd may lack a reader, so we stop monitoring explicitly if (commHasHalfClosedMonitor(fd)) commStopHalfClosedMonitor(fd); commSetTimeout(fd, -1, NULL, NULL); // notify read/write handlers after canceling select reservations, if any if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) { commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0); commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno); } if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) { commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno); } +#if DELAY_POOLS + if (ClientInfo *clientInfo = F->clientInfo) { + if (clientInfo->selectWaiting) { + clientInfo->selectWaiting = false; + // kick queue or it will get stuck as commWriteHandle is not called + clientInfo->kickQuotaQueue(); + } + } +#endif + commCallCloseHandlers(fd); if (F->pconn.uses) F->pconn.pool->count(F->pconn.uses); comm_empty_os_read_buffers(fd); AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete", CommCloseCbPtrFun(comm_close_complete, NULL)); Params &completeParams = GetCommParams(completeCall); completeParams.fd = fd; // must use async call to wait for all callbacks // scheduled before comm_close() to finish ScheduleCallHere(completeCall); PROF_stop(comm_close); } /* Send a udp datagram to specified TO_ADDR. */ @@ -1918,137 +1950,408 @@ * after accepting a client but before it opens a socket or a file. * Since Squid_MaxFD can be as high as several thousand, don't waste them */ RESERVED_FD = min(100, Squid_MaxFD / 4); conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler)); TheHalfClosed = new DescriptorSet; } void comm_exit(void) { delete TheHalfClosed; TheHalfClosed = NULL; safe_free(fd_table); safe_free(fdd_table); safe_free(commfd_table); } +#if DELAY_POOLS +// called when the queue is done waiting for the client bucket to fill +static void +commHandleWriteHelper(void * data) +{ + CommQuotaQueue *queue = static_cast(data); + assert(queue); + + ClientInfo *clientInfo = queue->clientInfo; + // ClientInfo invalidates queue if freed, so if we got here through, + // evenAdd cbdata protections, everything should be valid and consistent + assert(clientInfo); + assert(clientInfo->hasQueue()); + assert(clientInfo->hasQueue(queue)); + assert(!clientInfo->selectWaiting); + assert(clientInfo->eventWaiting); + clientInfo->eventWaiting = false; + + do { + // check that the head descriptor is still relevant + const int head = clientInfo->quotaPeekFd(); + comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head); + + if (fd_table[head].clientInfo == clientInfo && + clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv && + !fd_table[head].closing()) { + + // wait for the head descriptor to become ready for writing + commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0); + clientInfo->selectWaiting = true; + return; + } + + clientInfo->quotaDequeue(); // remove the no longer relevant descriptor + // and continue looking for a relevant one + } while (clientInfo->hasQueue()); + + debugs(77,3, HERE << "emptied queue"); +} + +bool +ClientInfo::hasQueue() const +{ + assert(quotaQueue); + return !quotaQueue->empty(); +} + +bool +ClientInfo::hasQueue(const CommQuotaQueue *q) const +{ + assert(quotaQueue); + return quotaQueue == q; +} + +/// returns the first descriptor to be dequeued +int +ClientInfo::quotaPeekFd() const +{ + assert(quotaQueue); + return quotaQueue->front(); +} + +/// returns the reservation ID of the first descriptor to be dequeued +unsigned int +ClientInfo::quotaPeekReserv() const +{ + assert(quotaQueue); + return quotaQueue->outs + 1; +} + +/// queues a given fd, creating the queue if necessary; returns reservation ID +unsigned int +ClientInfo::quotaEnqueue(int fd) +{ + assert(quotaQueue); + return quotaQueue->enqueue(fd); +} + +/// removes queue head +void +ClientInfo::quotaDequeue() +{ + assert(quotaQueue); + quotaQueue->dequeue(); +} + +void +ClientInfo::kickQuotaQueue() +{ + if (!eventWaiting && !selectWaiting && hasQueue()) { + // wait at least a second if the bucket is empty + const double delay = (bucketSize < 1.0) ? 1.0 : 0.0; + eventAdd("commHandleWriteHelper", &commHandleWriteHelper, + quotaQueue, delay, 0, true); + eventWaiting = true; + } +} + +/// calculates how much to write for a single dequeued client +int +ClientInfo::quotaForDequed() +{ + /* If we have multiple clients and give full bucketSize to each client then + * clt1 may often get a lot more because clt1->clt2 time distance in the + * select(2) callback order may be a lot smaller than cltN->clt1 distance. + * We divide quota evenly to be more fair. */ + + if (!rationedCount) { + rationedCount = quotaQueue->size() + 1; + + // The delay in ration recalculation _temporary_ deprives clients from + // bytes that should have trickled in while rationedCount was positive. + refillBucket(); + + // Rounding errors do not accumulate here, but we round down to avoid + // negative bucket sizes after write with rationedCount=1. + rationedQuota = static_cast(floor(bucketSize/rationedCount)); + debugs(77,5, HERE << "new rationedQuota: " << rationedQuota << + '*' << rationedCount); + } + + --rationedCount; + debugs(77,7, HERE << "rationedQuota: " << rationedQuota << + " rations remaining: " << rationedCount); + + // update 'last seen' time to prevent clientdb GC from dropping us + last_seen = squid_curtime; + return rationedQuota; +} + +///< adds bytes to the quota bucket based on the rate and passed time +void +ClientInfo::refillBucket() +{ + // all these times are in seconds, with double precision + const double currTime = current_dtime; + const double timePassed = currTime - prevTime; + + // Calculate allowance for the time passed. Use double to avoid + // accumulating rounding errors for small intervals. For example, always + // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error. + const double gain = timePassed * writeSpeedLimit; + + debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " << + bucketSize << " + (" << timePassed << " * " << writeSpeedLimit << + " = " << gain << ')'); + + // to further combat error accumulation during micro updates, + // quit before updating time if we cannot add at least one byte + if (gain < 1.0) + return; + + prevTime = currTime; + + // for "first" connections, drain initial fat before refilling but keep + // updating prevTime to avoid bursts after the fat is gone + if (bucketSize > bucketSizeLimit) { + debugs(77,4, HERE << "not refilling while draining initial fat"); + return; + } + + bucketSize += gain; + + // obey quota limits + if (bucketSize > bucketSizeLimit) + bucketSize = bucketSizeLimit; +} + +CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info), + ins(0), outs(0) +{ + assert(clientInfo); +} + +CommQuotaQueue::~CommQuotaQueue() +{ + assert(!clientInfo); // ClientInfo should clear this before destroying us +} + +/// places the given fd at the end of the queue; returns reservation ID +unsigned int +CommQuotaQueue::enqueue(int fd) +{ + debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key << + ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size()); + fds.push_back(fd); + return ++ins; +} + +/// removes queue head +void +CommQuotaQueue::dequeue() +{ + assert(!fds.empty()); + debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key << + ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' << + fds.size()); + fds.shift(); + ++outs; +} + +#endif + /* Write to FD. */ static void commHandleWrite(int fd, void *data) { comm_io_callback_t *state = (comm_io_callback_t *)data; int len = 0; int nleft; - assert(state == COMMIO_FD_WRITECB(fd)); + assert(state==COMMIO_FD_WRITECB(fd)); PROF_start(commHandleWrite); debugs(5, 5, "commHandleWrite: FD " << fd << ": off " << (long int) state->offset << ", sz " << (long int) state->size << "."); nleft = state->size - state->offset; + +#if DELAY_POOLS + ClientInfo * clientInfo=fd_table[fd].clientInfo; + + if (clientInfo && !clientInfo->writeLimitingActive) + clientInfo = NULL; // we only care about quota limits here + + if (clientInfo) { + assert(clientInfo->selectWaiting); + clientInfo->selectWaiting = false; + + assert(clientInfo->hasQueue()); + assert(clientInfo->quotaPeekFd() == fd); + clientInfo->quotaDequeue(); // we will write or requeue below + + if (nleft > 0) { + const int quota = clientInfo->quotaForDequed(); + if (!quota) { // if no write quota left, queue this fd + state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); + clientInfo->kickQuotaQueue(); + PROF_stop(commHandleWrite); + return; + } + + const int nleft_corrected = min(nleft, quota); + if (nleft != nleft_corrected) { + debugs(5, 5, HERE << "FD " << fd << " writes only " << + nleft_corrected << " out of " << nleft); + nleft = nleft_corrected; + } + + } + } + +#endif + + /* actually WRITE data */ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); debugs(5, 5, "commHandleWrite: write() returns " << len); + +#if DELAY_POOLS + if (clientInfo) { + if (len > 0) { + /* we wrote data - drain them from bucket */ + clientInfo->bucketSize -= len; + if (clientInfo->bucketSize < 0.0) + { + debugs(5,1, HERE << "drained too much"); // should not happen + clientInfo->bucketSize = 0; + } + } + + // even if we wrote nothing, we were served; give others a chance + clientInfo->kickQuotaQueue(); + } +#endif + fd_bytes(fd, len, FD_WRITE); statCounter.syscalls.sock.writes++; // After each successful partial write, // reset fde::writeStart to the current time. fd_table[fd].writeStart = squid_curtime; if (len == 0) { /* Note we even call write if nleft == 0 */ /* We're done */ if (nleft != 0) debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining."); commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno); } else if (len < 0) { /* An error */ if (fd_table[fd].flags.socket_eof) { debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << "."); commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno); } else if (ignoreErrno(errno)) { debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << "."); - commSetSelect(fd, - COMM_SELECT_WRITE, - commHandleWrite, - state, - 0); + commSelectOrQueueWrite(fd); } else { debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << "."); commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno); } } else { /* A successful write, continue */ state->offset += len; if (state->offset < state->size) { /* Not done, reinstall the write handler and write some more */ - commSetSelect(fd, - COMM_SELECT_WRITE, - commHandleWrite, - state, - 0); + commSelectOrQueueWrite(fd); } else { commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno); } } PROF_stop(commHandleWrite); } /* * Queue a write. handler/handler_data are called when the write * completes, on error, or on file descriptor close. * * free_func is used to free the passed buffer when the write has completed. */ void comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func) { AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander", CommIoCbPtrFun(handler, handler_data)); comm_write(fd, buf, size, call, free_func); } void comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) { debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback); /* Make sure we are open, not closing, and not writing */ assert(isOpen(fd)); assert(!fd_table[fd].closing()); comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd); assert(!ccb->active()); fd_table[fd].writeStart = squid_curtime; /* Queue the write */ commio_set_callback(fd, IOCB_WRITE, ccb, callback, (char *)buf, free_func, size); + + commSelectOrQueueWrite(fd); +} + +// called when fd needs to write but may need to wait in line for its quota +static void +commSelectOrQueueWrite(const int fd) +{ + comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd); + +#if DELAY_POOLS + // stand in line if there is one + if (ClientInfo *clientInfo = fd_table[fd].clientInfo) { + if (clientInfo->writeLimitingActive) { + ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd); + clientInfo->kickQuotaQueue(); + return; + } + } +#endif + commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0); } /* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */ void comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data) { comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc()); } void comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback) { comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc()); } /* * hm, this might be too general-purpose for all the places we'd === modified file 'src/fde.h' --- src/fde.h 2010-10-06 13:03:11 +0000 +++ src/fde.h 2010-10-12 11:30:27 +0000 @@ -16,40 +16,43 @@ * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #ifndef SQUID_FDE_H #define SQUID_FDE_H #include "comm.h" #include "ip/Address.h" +#if DELAY_POOLS +class ClientInfo; +#endif class PconnPool; class fde { public: fde() { clear(); }; /// True if comm_close for this fd has been called bool closing() { return flags.close_request; } /* NOTE: memset is used on fdes today. 20030715 RBC */ static void DumpStats (StoreEntry *); char const *remoteAddr() const; void dumpStats (StoreEntry &, int); bool readPending(int); void noteUse(PconnPool *); public: @@ -72,40 +75,43 @@ unsigned int socket_eof:1; unsigned int nolinger:1; unsigned int nonblocking:1; unsigned int ipc:1; unsigned int called_connect:1; unsigned int nodelay:1; unsigned int close_on_exec:1; unsigned int read_pending:1; unsigned int write_pending:1; unsigned int transparent:1; } flags; int64_t bytes_read; int64_t bytes_written; struct { int uses; /* ie # req's over persistent conn */ PconnPool *pool; } pconn; +#if DELAY_POOLS + ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */ +#endif unsigned epoll_state; struct _fde_disk disk; PF *read_handler; void *read_data; PF *write_handler; void *write_data; AsyncCall::Pointer timeoutHandler; time_t timeout; time_t writeStart; void *lifetime_data; AsyncCall::Pointer closeHandler; AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds CommWriteStateData *wstate; /* State data for comm_write */ READ_HANDLER *read_method; WRITE_HANDLER *write_method; #if USE_SSL SSL *ssl; #endif #ifdef _SQUID_MSWIN_ @@ -123,40 +129,41 @@ nfmarkToServer in that this is the value we *receive* from the, connection, whereas nfmarkToServer is the value to set on packets *leaving* Squid. */ private: /** Clear the fde class back to NULL equivalent. */ inline void clear() { type = 0; remote_port = 0; local_addr.SetEmpty(); tosToServer = '\0'; nfmarkToServer = 0; sock_family = 0; memset(ipaddr, '\0', MAX_IPSTRLEN); memset(desc,'\0',FD_DESC_SZ); memset(&flags,0,sizeof(_fde_flags)); bytes_read = 0; bytes_written = 0; pconn.uses = 0; pconn.pool = NULL; + clientInfo = NULL; epoll_state = 0; memset(&disk, 0, sizeof(_fde_disk)); read_handler = NULL; read_data = NULL; write_handler = NULL; write_data = NULL; timeoutHandler = NULL; timeout = 0; writeStart = 0; lifetime_data = NULL; closeHandler = NULL; halfClosedReader = NULL; wstate = NULL; read_method = NULL; write_method = NULL; #if USE_SSL ssl = NULL; #endif #ifdef _SQUID_MSWIN_ win32.handle = NULL; === modified file 'src/main.cc' --- src/main.cc 2010-10-07 13:07:12 +0000 +++ src/main.cc 2010-10-11 15:47:38 +0000 @@ -63,40 +63,44 @@ #include "ip/tools.h" #if USE_EPOLL #include "comm_epoll.h" #endif #if USE_KQUEUE #include "comm_kqueue.h" #endif #if USE_POLL #include "comm_poll.h" #endif #if defined(USE_SELECT) || defined(USE_SELECT_WIN32) #include "comm_select.h" #endif #include "SquidTime.h" #include "SwapDir.h" #include "forward.h" #include "MemPool.h" #include "icmp/IcmpSquid.h" #include "icmp/net_db.h" +#if DELAY_POOLS +#include "ClientDelayConfig.h" +#endif + #if USE_LOADABLE_MODULES #include "LoadableModules.h" #endif #if ICAP_CLIENT #include "adaptation/icap/Config.h" #endif #if USE_ECAP #include "adaptation/ecap/Config.h" #endif #if USE_ADAPTATION #include "adaptation/Config.h" #endif #if USE_SQUID_ESI #include "esi/Module.h" #endif #include "fs/Module.h" @@ -822,40 +826,44 @@ if (IamPrimaryProcess()) { #if USE_WCCP wccpInit(); #endif #if USE_WCCPv2 wccp2Init(); #endif } serverConnectionsOpen(); neighbors_init(); storeDirOpenSwapLogs(); mimeInit(Config.mimeTablePathname); +#if DELAY_POOLS + Config.ClientDelay.finalize(); +#endif + if (Config.onoff.announce) { if (!eventFind(start_announce, NULL)) eventAdd("start_announce", start_announce, NULL, 3600.0, 1); } else { if (eventFind(start_announce, NULL)) eventDelete(start_announce, NULL); } writePidFile(); /* write PID file */ debugs(1, 1, "Ready to serve requests."); reconfiguring = 0; } static void mainRotate(void) { icmpEngine.Close(); #if USE_DNSSERVERS @@ -1144,40 +1152,44 @@ bool enableAdaptation = false; // We can remove this dependency on specific adaptation mechanisms // if we create a generic Registry of such mechanisms. Should we? #if ICAP_CLIENT Adaptation::Icap::TheConfig.finalize(); enableAdaptation = Adaptation::Icap::TheConfig.onoff || enableAdaptation; #endif #if USE_ECAP Adaptation::Ecap::TheConfig.finalize(); // must be after we load modules enableAdaptation = Adaptation::Ecap::TheConfig.onoff || enableAdaptation; #endif // must be the last adaptation-related finalize Adaptation::Config::Finalize(enableAdaptation); #endif #if USE_SQUID_ESI Esi::Init(); #endif +#if DELAY_POOLS + Config.ClientDelay.finalize(); +#endif + debugs(1, 1, "Ready to serve requests."); if (!configured_once) { eventAdd("storeMaintain", Store::Maintain, NULL, 1.0, 1); if (Config.onoff.announce) eventAdd("start_announce", start_announce, NULL, 3600.0, 1); eventAdd("ipcache_purgelru", ipcache_purgelru, NULL, 10.0, 1); eventAdd("fqdncache_purgelru", fqdncache_purgelru, NULL, 15.0, 1); #if USE_XPROF_STATS eventAdd("cpuProfiling", xprof_event, NULL, 1.0, 1); #endif eventAdd("memPoolCleanIdlePools", Mem::CleanIdlePools, NULL, 15.0, 1); } === modified file 'src/protos.h' --- src/protos.h 2010-10-06 03:50:45 +0000 +++ src/protos.h 2010-10-11 15:57:56 +0000 @@ -28,40 +28,43 @@ */ #ifndef SQUID_PROTOS_H #define SQUID_PROTOS_H /* included for routines that have not moved out to their proper homes * yet. */ #include "Packer.h" /* for routines still in this file that take CacheManager parameters */ #include "ip/Address.h" /* for parameters that still need these */ #include "enums.h" /* some parameters stil need this */ #include "wordlist.h" /* for parameters that still need these */ #include "lookup_t.h" class HttpRequestMethod; +#if DELAY_POOLS +class ClientInfo; +#endif #if USE_FORW_VIA_DB SQUIDCEXTERN void fvdbCountVia(const char *key); SQUIDCEXTERN void fvdbCountForw(const char *key); #endif #if HEADERS_LOG SQUIDCEXTERN void headersLog(int cs, int pq, const HttpRequestMethod& m, void *data); #endif SQUIDCEXTERN char *log_quote(const char *header); SQUIDCEXTERN int logTypeIsATcpHit(log_type); /* * cache_cf.c */ SQUIDCEXTERN void configFreeMemory(void); class MemBuf; SQUIDCEXTERN void wordlistCat(const wordlist *, MemBuf * mb); SQUIDCEXTERN void self_destruct(void); SQUIDCEXTERN void add_http_port(char *portspec); @@ -72,40 +75,44 @@ /* extra functions from cache_cf.c useful for lib modules */ SQUIDCEXTERN void parse_int(int *var); SQUIDCEXTERN void parse_onoff(int *var); SQUIDCEXTERN void parse_eol(char *volatile *var); SQUIDCEXTERN void parse_wordlist(wordlist ** list); SQUIDCEXTERN void requirePathnameExists(const char *name, const char *path); SQUIDCEXTERN void parse_time_t(time_t * var); /* client_side.c - FD related client side routines */ SQUIDCEXTERN void clientdbInit(void); SQUIDCEXTERN void clientdbUpdate(const Ip::Address &, log_type, protocol_t, size_t); SQUIDCEXTERN int clientdbCutoffDenied(const Ip::Address &); void clientdbDump(StoreEntry *); SQUIDCEXTERN void clientdbFreeMemory(void); SQUIDCEXTERN int clientdbEstablished(const Ip::Address &, int); +#if DELAY_POOLS +SQUIDCEXTERN void clientdbSetWriteLimiter(ClientInfo * info, const int writeSpeedLimit,const double initialBurst,const double highWatermark); +SQUIDCEXTERN ClientInfo * clientdbGetInfo(const Ip::Address &addr); +#endif SQUIDCEXTERN void clientOpenListenSockets(void); SQUIDCEXTERN void clientHttpConnectionsClose(void); SQUIDCEXTERN void httpRequestFree(void *); extern void clientAccessCheck(void *); #include "Debug.h" /* see debug.c for info on context-based debugging */ SQUIDCEXTERN Ctx ctx_enter(const char *descr); SQUIDCEXTERN void ctx_exit(Ctx ctx); SQUIDCEXTERN void _db_set_syslog(const char *facility); SQUIDCEXTERN void _db_init(const char *logfile, const char *options); SQUIDCEXTERN void _db_rotate_log(void); /* packs, then prints an object using debugs() */ SQUIDCEXTERN void debugObj(int section, int level, const char *label, void *obj, ObjPackMethod pm); /* disk.c */ === modified file 'src/structs.h' --- src/structs.h 2010-10-06 03:50:45 +0000 +++ src/structs.h 2010-10-11 15:47:38 +0000 @@ -109,40 +109,41 @@ struct acl_size_t { acl_size_t *next; ACLList *aclList; int64_t size; }; struct ushortlist { u_short i; ushortlist *next; }; struct relist { char *pattern; regex_t regex; relist *next; }; #if DELAY_POOLS #include "DelayConfig.h" +#include "ClientDelayConfig.h" #endif #if USE_ICMP #include "icmp/IcmpConfig.h" #endif #include "HelperChildConfig.h" /* forward decl for SquidConfig, see RemovalPolicy.h */ class CpuAffinityMap; class RemovalPolicySettings; class external_acl; class Store; struct SquidConfig { struct { /* These should be for the Store::Root instance. * this needs pluggable parsing to be done smoothly. @@ -518,40 +519,41 @@ int use_short_names; } icons; char *errorDirectory; #if USE_ERR_LOCALES char *errorDefaultLanguage; int errorLogMissingLanguages; #endif char *errorStylesheet; struct { int maxtries; int onerror; } retry; struct { int64_t limit; } MemPools; #if DELAY_POOLS DelayConfig Delay; + ClientDelayConfig ClientDelay; #endif struct { int icp_average; int dns_average; int http_average; int icp_min_poll; int dns_min_poll; int http_min_poll; } comm_incoming; int max_open_disk_fds; int uri_whitespace; acl_size_t *rangeOffsetLimit; #if MULTICAST_MISS_STREAM struct { Ip::Address addr; int ttl; unsigned short port;