Honor ICAP Max-Connections limits. When the ICAP service connection limit is reached, and a new transaction is about to start, Squid either terminates the transaction with an error, suspends the transaction until a connection slot becomes available, ignores the limit, or bypasses the service, depending on the on-overload service option set for icap_service in squid.conf. The service-specified limit (if any) can be ignored in favor of the max-conn option value set for icap_service in squid.conf. OPTIONS transactions bypass the limit to avoid the situation where the old limit prevents Squid from getting fresh OPTIONS (with a new limit) and similar oddities. OPTIONS transactions should be relatively rare so this exception should not be a real problem. When the limit is reached for the first time, Squid logs a warning to the general log. With N SMP workers, each will assume the service limit is Max-Connections/N, even though not all workers may use a given service. === modified file 'src/adaptation/Elements.h' --- src/adaptation/Elements.h 2008-03-24 04:40:39 +0000 +++ src/adaptation/Elements.h 2011-04-07 14:40:53 +0000 @@ -1,18 +1,19 @@ #ifndef SQUID_ADAPTATION__ELEMENTS_H #define SQUID_ADAPTATION__ELEMENTS_H // widely used adaptation primitives namespace Adaptation { typedef enum { methodNone, methodReqmod, methodRespmod, methodOptions } Method; typedef enum { pointNone, pointPreCache, pointPostCache } VectPoint; +typedef enum { srvBlock, srvBypass, srvWait, srvForce} SrvBehaviour; extern const char *crlf; extern const char *methodStr(Method); // TODO: make into a stream operator? extern const char *vectPointStr(VectPoint); // TODO: make into a stream op? } // namespace Adaptation #endif /* SQUID_ADAPTATION_ELEMENTS_H */ === modified file 'src/adaptation/ServiceConfig.cc' --- src/adaptation/ServiceConfig.cc 2011-03-08 23:56:22 +0000 +++ src/adaptation/ServiceConfig.cc 2011-04-27 10:15:43 +0000 @@ -1,32 +1,33 @@ /* * DEBUG: section 93 Adaptation */ #include "squid.h" #include "ConfigParser.h" #include "adaptation/ServiceConfig.h" #include "ip/tools.h" Adaptation::ServiceConfig::ServiceConfig(): port(-1), method(methodNone), point(pointNone), - bypass(false), routing(false), ipv6(false) + bypass(false), maxConn(-1), onOverload(srvWait), + routing(false), ipv6(false) {} const char * Adaptation::ServiceConfig::methodStr() const { return Adaptation::methodStr(method); } const char * Adaptation::ServiceConfig::vectPointStr() const { return Adaptation::vectPointStr(point); } Adaptation::Method Adaptation::ServiceConfig::parseMethod(const char *str) const { if (!strncasecmp(str, "REQMOD", 6)) return Adaptation::methodReqmod; @@ -53,78 +54,88 @@ return Adaptation::pointNone; } bool Adaptation::ServiceConfig::parse() { String method_point; ConfigParser::ParseString(&key); ConfigParser::ParseString(&method_point); method = parseMethod(method_point.termedBuf()); point = parseVectPoint(method_point.termedBuf()); // reset optional parameters in case we are reconfiguring bypass = routing = false; // handle optional service name=value parameters const char *lastOption = NULL; bool grokkedUri = false; + bool onOverloadSet = false; while (char *option = strtok(NULL, w_space)) { if (strcmp(option, "0") == 0) { // backward compatibility bypass = false; continue; } if (strcmp(option, "1") == 0) { // backward compatibility bypass = true; continue; } const char *name = option; char *value = strstr(option, "="); if (!value) { lastOption = option; break; } *value = '\0'; // terminate option name ++value; // skip '=' // TODO: warn if option is set twice? bool grokked = false; - if (strcmp(name, "bypass") == 0) + if (strcmp(name, "bypass") == 0) { grokked = grokBool(bypass, name, value); - else if (strcmp(name, "routing") == 0) + } else if (strcmp(name, "routing") == 0) grokked = grokBool(routing, name, value); else if (strcmp(name, "uri") == 0) grokked = grokkedUri = grokUri(value); else if (strcmp(name, "ipv6") == 0) { grokked = grokBool(ipv6, name, value); if (grokked && ipv6 && !Ip::EnableIpv6) debugs(3, DBG_IMPORTANT, "WARNING: IPv6 is disabled. ICAP service option ignored."); + } else if (strcmp(name, "max-conn") == 0) + grokked = grokLong(maxConn, name, value); + else if (strcmp(name, "on-overload") == 0) { + grokked = grokOnOverload(onOverload, value); + onOverloadSet = true; } else grokked = grokExtension(name, value); if (!grokked) return false; } + // set default on-overload value if needed + if (!onOverloadSet) + onOverload = bypass ? srvBypass : srvWait; + // what is left must be the service URI if (!grokkedUri && !grokUri(lastOption)) return false; // there should be nothing else left if (const char *tail = strtok(NULL, w_space)) { debugs(3, 0, cfg_filename << ':' << config_lineno << ": " << "garbage after adaptation service URI: " << tail); return false; } debugs(3,5, cfg_filename << ':' << config_lineno << ": " << "adaptation_service " << key << ' ' << methodStr() << "_" << vectPointStr() << ' ' << bypass << routing << ' ' << uri); return true; } @@ -230,28 +241,63 @@ bool Adaptation::ServiceConfig::grokBool(bool &var, const char *name, const char *value) { if (!strcmp(value, "0") || !strcmp(value, "off")) var = false; else if (!strcmp(value, "1") || !strcmp(value, "on")) var = true; else { debugs(3, 0, HERE << cfg_filename << ':' << config_lineno << ": " << "wrong value for boolean " << name << "; " << "'0', '1', 'on', or 'off' expected but got: " << value); return false; } return true; } bool +Adaptation::ServiceConfig::grokLong(long &var, const char *name, const char *value) +{ + char *bad = NULL; + const long p = strtol(value, &bad, 0); + if (p < 0 || bad == value) { + debugs(3, 0, HERE << cfg_filename << ':' << config_lineno << ": " << + "wrong value for " << name << "; " << + "a non-negative integer expected but got: " << value); + return false; + } + var = p; + return true; +} + +bool +Adaptation::ServiceConfig::grokOnOverload(SrvBehaviour &var, const char *value) +{ + if (strcmp(value, "block") == 0) + var = srvBlock; + else if (strcmp(value, "bypass") == 0) + var = srvBypass; + else if (strcmp(value, "wait") == 0) + var = srvWait; + else if (strcmp(value, "force") == 0) + var = srvForce; + else { + debugs(3, DBG_CRITICAL, HERE << cfg_filename << ':' << config_lineno << ": " << + "wrong value for on-overload; " << + "'block', 'bypass', 'wait' or 'force' expected but got: " << value); + return false; + } + return true; +} + +bool Adaptation::ServiceConfig::grokExtension(const char *name, const char *value) { // we do not accept extensions by default debugs(3, DBG_CRITICAL, cfg_filename << ':' << config_lineno << ": " << "ERROR: unknown adaptation service option: " << name << '=' << value); return false; } === modified file 'src/adaptation/ServiceConfig.h' --- src/adaptation/ServiceConfig.h 2010-12-18 00:31:53 +0000 +++ src/adaptation/ServiceConfig.h 2011-04-27 10:24:19 +0000 @@ -15,37 +15,44 @@ ServiceConfig(); const char *methodStr() const; const char *vectPointStr() const; bool parse(); public: String key; // service_configConfig name in the configuration file String uri; // service_configConfig URI // service_configConfig URI components String protocol; String host; String resource; int port; Method method; // what is being adapted (REQMOD vs RESPMOD) VectPoint point; // where the adaptation happens (pre- or post-cache) bool bypass; + + // options + long maxConn; ///< maximum number of concurrent service transactions + SrvBehaviour onOverload; ///< how to handle Max-Connections feature bool routing; ///< whether this service may determine the next service(s) bool ipv6; ///< whether this service uses IPv6 transport (default IPv4) protected: Method parseMethod(const char *buf) const; VectPoint parseVectPoint(const char *buf) const; /// interpret parsed values bool grokBool(bool &var, const char *name, const char *value); bool grokUri(const char *value); + bool grokLong(long &var, const char *name, const char *value); + /// handle on-overload configuration option + bool grokOnOverload(SrvBehaviour &var, const char *value); /// handle name=value configuration option with name unknown to Squid virtual bool grokExtension(const char *name, const char *value); }; } // namespace Adaptation #endif /* SQUID_ADAPTATION__SERVICE_CONFIG_H */ === modified file 'src/adaptation/icap/ModXact.cc' --- src/adaptation/icap/ModXact.cc 2011-04-07 12:42:02 +0000 +++ src/adaptation/icap/ModXact.cc 2011-04-26 14:38:06 +0000 @@ -69,70 +69,115 @@ icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class? debugs(93,7, HERE << "initialized." << status()); } // initiator wants us to start void Adaptation::Icap::ModXact::start() { Adaptation::Icap::Xaction::start(); // reserve an adaptation history slot (attempts are known at this time) Adaptation::History::Pointer ah = virginRequest().adaptLogHistory(); if (ah != NULL) adaptHistoryId = ah->recordXactStart(service().cfg().key, icap_tr_start, attempts > 1); estimateVirginBody(); // before virgin disappears! canStartBypass = service().cfg().bypass; // it is an ICAP violation to send request to a service w/o known OPTIONS - - if (service().up()) + // and the service may is too busy for us: honor Max-Connections and such + if (service().up() && service().availableForNew()) startWriting(); else waitForService(); } void Adaptation::Icap::ModXact::waitForService() { + const char *comment; Must(!state.serviceWaiting); - debugs(93, 7, HERE << "will wait for the ICAP service" << status()); - typedef NullaryMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(93,5, - Dialer, this, Adaptation::Icap::ModXact::noteServiceReady); - service().callWhenReady(call); + + if (!service().up()) { + AsyncCall::Pointer call = JobCallback(93,5, + ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceReady); + + service().callWhenReady(call); + comment = "to be up"; + } else { + //The service is unavailable because of max-connection or other reason + + if (service().cfg().onOverload != srvWait) { + // The service is overloaded, but waiting to be available prohibited by + // user configuration (onOverload is set to "block" or "bypass") + if (service().cfg().onOverload == srvBlock) + disableBypass("not available", true); + else //if (service().cfg().onOverload == srvBypass) + canStartBypass = true; + + disableRetries(); + disableRepeats("ICAP service is not available"); + + debugs(93, 7, HERE << "will not wait for the service to be available" << + status()); + + throw TexcHere("ICAP service is not available"); + } + + AsyncCall::Pointer call = JobCallback(93,5, + ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceAvailable); + service().callWhenAvailable(call, state.waitedForService); + comment = "to be available"; + } + + debugs(93, 7, HERE << "will wait for the service " << comment << status()); state.serviceWaiting = true; // after callWhenReady() which may throw + state.waitedForService = true; } void Adaptation::Icap::ModXact::noteServiceReady() { Must(state.serviceWaiting); state.serviceWaiting = false; - if (service().up()) { - startWriting(); - } else { + if (!service().up()) { disableRetries(); disableRepeats("ICAP service is unusable"); throw TexcHere("ICAP service is unusable"); } + + if (service().availableForOld()) + startWriting(); + else + waitForService(); +} + +void Adaptation::Icap::ModXact::noteServiceAvailable() +{ + Must(state.serviceWaiting); + state.serviceWaiting = false; + + if (service().up() && service().availableForOld()) + startWriting(); + else + waitForService(); } void Adaptation::Icap::ModXact::startWriting() { state.writing = State::writingConnect; decideOnPreview(); // must be decided before we decideOnRetries decideOnRetries(); openConnection(); } // connection with the ICAP service established void Adaptation::Icap::ModXact::handleCommConnected() { Must(state.writing == State::writingConnect); startReading(); // wait for early errors from the ICAP server MemBuf requestBuf; === modified file 'src/adaptation/icap/ModXact.h' --- src/adaptation/icap/ModXact.h 2010-10-21 08:13:41 +0000 +++ src/adaptation/icap/ModXact.h 2011-04-26 14:38:06 +0000 @@ -140,40 +140,41 @@ virtual ~ModXact(); // BodyProducer methods virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); // BodyConsumer methods virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer); virtual void noteBodyProductionEnded(BodyPipe::Pointer); virtual void noteBodyProducerAborted(BodyPipe::Pointer); // comm handlers virtual void handleCommConnected(); virtual void handleCommWrote(size_t size); virtual void handleCommRead(size_t size); void handleCommWroteHeaders(); void handleCommWroteBody(); // service waiting void noteServiceReady(); + void noteServiceAvailable(); public: InOut virgin; InOut adapted; // bypasses exceptions if needed and possible virtual void callException(const std::exception &e); /// record error detail in the virgin request if possible virtual void detailError(int errDetail); private: virtual void start(); /// locates the request, either as a cause or as a virgin message itself const HttpRequest &virginRequest() const; // Must always be available void estimateVirginBody(); void makeAdaptedBodyPipe(const char *what); @@ -286,40 +287,41 @@ * size of dechunked HTTP body in ICAP reply or -1 if there is not any * encapsulated message data */ int64_t replyHttpBodySize; int adaptHistoryId; ///< adaptation history slot reservation class State { public: State(); public: bool serviceWaiting; // waiting for ICAP service options bool allowedPostview204; // mmust handle 204 No Content outside preview bool allowedPostview206; // must handle 206 Partial Content outside preview bool allowedPreview206; // must handle 206 Partial Content inside preview bool readyForUob; ///< got a 206 response and expect a use-origin-body + bool waitedForService; ///< true if was queued at least once // will not write anything [else] to the ICAP server connection bool doneWriting() const { return writing == writingReallyDone; } // will not use virgin.body_pipe bool doneConsumingVirgin() const { return writing >= writingAlmostDone && ((sending == sendingAdapted && !readyForUob) || sending == sendingDone); } // parsed entire ICAP response from the ICAP server bool doneParsing() const { return parsing == psDone; } // is parsing ICAP or HTTP headers read from the ICAP server bool parsingHeaders() const { return parsing == psIcapHeader || parsing == psHttpHeader; } === modified file 'src/adaptation/icap/Options.cc' --- src/adaptation/icap/Options.cc 2010-10-13 00:14:42 +0000 +++ src/adaptation/icap/Options.cc 2011-04-26 14:31:14 +0000 @@ -81,40 +81,42 @@ if (h->hasByNameListMember("Methods", "REQMOD", ',')) cfgMethod(ICAP::methodReqmod); if (h->hasByNameListMember("Methods", "RESPMOD", ',')) cfgMethod(ICAP::methodRespmod); service = h->getByName("Service"); serviceId = h->getByName("ServiceId"); istag = h->getByName("ISTag"); if (h->getByName("Opt-body-type").size()) { // TODO: add a class to rate-limit such warnings using FadingCounter debugs(93,DBG_IMPORTANT, "WARNING: Ignoring unsupported ICAP " << "OPTIONS body; type: " << h->getByName("Opt-body-type")); // Do not set error, assuming the response headers are valid. } cfgIntHeader(h, "Max-Connections", max_connections); + if (max_connections == 0) + debugs(93, DBG_IMPORTANT, "WARNING: Max-Connections is set to zero! "); cfgIntHeader(h, "Options-TTL", theTTL); theTimestamp = h->getTime(HDR_DATE); if (theTimestamp < 0) theTimestamp = squid_curtime; if (h->hasListMember(HDR_ALLOW, "204", ',')) allow204 = true; if (h->hasListMember(HDR_ALLOW, "206", ',')) allow206 = true; cfgIntHeader(h, "Preview", preview); cfgTransferList(h, theTransfers.preview); cfgTransferList(h, theTransfers.ignore); cfgTransferList(h, theTransfers.complete); } === modified file 'src/adaptation/icap/ServiceRep.cc' --- src/adaptation/icap/ServiceRep.cc 2011-03-11 23:02:23 +0000 +++ src/adaptation/icap/ServiceRep.cc 2011-04-27 09:51:49 +0000 @@ -1,46 +1,53 @@ /* * DEBUG: section 93 ICAP (RFC 3507) Client */ #include "squid.h" #include "adaptation/Answer.h" #include "adaptation/icap/Config.h" #include "adaptation/icap/ModXact.h" #include "adaptation/icap/Options.h" #include "adaptation/icap/OptXact.h" #include "adaptation/icap/ServiceRep.h" #include "base/TextException.h" #include "ConfigParser.h" #include "HttpReply.h" #include "SquidTime.h" +#include "fde.h" CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, ServiceRep); Adaptation::Icap::ServiceRep::ServiceRep(const ServiceConfigPointer &svcCfg): AsyncJob("Adaptation::Icap::ServiceRep"), Adaptation::Service(svcCfg), theOptions(NULL), theOptionsFetcher(0), theLastUpdate(0), + theBusyConns(0), + theAllWaiters(0), + connOverloadReported(false), + theIdleConns("ICAP Service"), isSuspended(0), notifying(false), updateScheduled(false), wasAnnouncedUp(true), // do not announce an "up" service at startup isDetached(false) -{} +{ + setMaxConnections(); +} Adaptation::Icap::ServiceRep::~ServiceRep() { Must(!theOptionsFetcher); delete theOptions; } void Adaptation::Icap::ServiceRep::finalize() { Adaptation::Service::finalize(); // use /etc/services or default port if needed const bool have_port = cfg().port >= 0; if (!have_port) { struct servent *serv = getservbyname("icap", "tcp"); if (serv) { writeableCfg().port = htons(serv->s_port); } else { @@ -55,67 +62,228 @@ void Adaptation::Icap::ServiceRep::noteFailure() { const int failures = theSessionFailures.count(1); debugs(93,4, HERE << " failure " << failures << " out of " << TheConfig.service_failure_limit << " allowed in " << TheConfig.oldest_service_failure << "sec " << status()); if (isSuspended) return; if (TheConfig.service_failure_limit >= 0 && failures > TheConfig.service_failure_limit) suspend("too many failures"); // TODO: Should bypass setting affect how much Squid tries to talk to // the ICAP service that is currently unusable and is likely to remain // so for some time? The current code says "no". Perhaps the answer // should be configurable. } +// returns a persistent or brand new connection; negative int on failures +int Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) +{ + Ip::Address anyAddr; + int connection = theIdleConns.pop(cfg().host.termedBuf(), cfg().port, NULL, anyAddr, + retriableXact); + + reused = connection >= 0; // reused a persistent connection + + if (!reused) // need a new connection + connection = comm_open(SOCK_STREAM, 0, anyAddr, COMM_NONBLOCKING, cfg().uri.termedBuf()); + + if (connection >= 0) + ++theBusyConns; + + return connection; +} + +// pools connection if it is reusable or closes it +void Adaptation::Icap::ServiceRep::putConnection(int fd, bool isReusable, const char *comment) +{ + Must(fd >= 0); + // do not pool an idle connection if we owe connections + if (isReusable && excessConnections() == 0) { + debugs(93, 3, HERE << "pushing pconn" << comment); + commSetTimeout(fd, -1, NULL, NULL); + Ip::Address anyAddr; + theIdleConns.push(fd, cfg().host.termedBuf(), cfg().port, NULL, anyAddr); + } else { + debugs(93, 3, HERE << "closing pconn" << comment); + // comm_close will clear timeout + comm_close(fd); + } + + Must(theBusyConns > 0); + --theBusyConns; + // a connection slot released. Check if there are waiters.... + busyCheckpoint(); +} + +// a wrapper to avoid exposing theIdleConns +void Adaptation::Icap::ServiceRep::noteConnectionUse(int fd) +{ + Must(fd >= 0); + fd_table[fd].noteUse(&theIdleConns); +} + +void Adaptation::Icap::ServiceRep::setMaxConnections() +{ + if (cfg().maxConn >= 0) + theMaxConnections = cfg().maxConn; + else if (theOptions && theOptions->max_connections >= 0) + theMaxConnections = theOptions->max_connections; + else { + theMaxConnections = -1; + return; + } + + if (::Config.workers > 1 ) + theMaxConnections /= ::Config.workers; +} + +int Adaptation::Icap::ServiceRep::availableConnections() const +{ + if (theMaxConnections < 0) + return -1; + + // we are available if we can open or reuse connections + // in other words, if we will not create debt + int available = max(0, theMaxConnections - theBusyConns); + + if (!available && !connOverloadReported) { + debugs(93, DBG_IMPORTANT, "WARNING: ICAP Max-Connections limit " << + "exceeded for service " << cfg().uri << ". Open connections now: " << + theBusyConns + theIdleConns.count() << ", including " << + theIdleConns.count() << " idle persistent connections."); + connOverloadReported = true; + } + + if (cfg().onOverload == srvForce) + return -1; + + return available; +} + +// The number of connections which excess the Max-Connections limit +int Adaptation::Icap::ServiceRep::excessConnections() const +{ + if (theMaxConnections < 0) + return 0; + + // Waiters affect the number of needed connections but a needed + // connection may still be excessive from Max-Connections p.o.v. + // so we should not account for waiting transaction needs here. + const int debt = theBusyConns + theIdleConns.count() - theMaxConnections; + if (debt > 0) + return debt; + else + return 0; +} + +void Adaptation::Icap::ServiceRep::noteGoneWaiter() +{ + theAllWaiters--; + + // in case the notified transaction did not take the connection slot + busyCheckpoint(); +} + +// called when a connection slot may become available +void Adaptation::Icap::ServiceRep::busyCheckpoint() +{ + if (theNotificationWaiters.empty()) // nobody is waiting for a slot + return; + + int freed = 0; + int available = availableConnections(); + + if (available < 0) { + // It is possible to have waiters when no limit on connections exist in + // case of reconfigure or because new Options received. + // In this case, notify all waiting transactions. + freed = theNotificationWaiters.size(); + } else { + // avoid notifying more waiters than there will be available slots + const int notifiedWaiters = theAllWaiters - theNotificationWaiters.size(); + freed = available - notifiedWaiters; + } + + debugs(93,7, HERE << "Available connections: " << available << + " freed slots: " << freed << + " waiting in queue: " << theNotificationWaiters.size()); + + while (freed > 0 && !theNotificationWaiters.empty()) { + Client i = theNotificationWaiters.front(); + theNotificationWaiters.pop_front(); + ScheduleCallHere(i.callback); + i.callback = NULL; + --freed; + } +} + void Adaptation::Icap::ServiceRep::suspend(const char *reason) { if (isSuspended) { debugs(93,4, HERE << "keeping suspended, also for " << reason); } else { isSuspended = reason; debugs(93,1, "suspending ICAP service for " << reason); scheduleUpdate(squid_curtime + TheConfig.service_revival_delay); announceStatusChange("suspended", true); } } bool Adaptation::Icap::ServiceRep::probed() const { return theLastUpdate != 0; } bool Adaptation::Icap::ServiceRep::hasOptions() const { return theOptions && theOptions->valid() && theOptions->fresh(); } bool Adaptation::Icap::ServiceRep::up() const { return !isSuspended && hasOptions(); } +bool Adaptation::Icap::ServiceRep::availableForNew() const +{ + Must(up()); + int available = availableConnections(); + if (available < 0) + return true; + else + return (available - theAllWaiters > 0); +} + +bool Adaptation::Icap::ServiceRep::availableForOld() const +{ + Must(up()); + + int available = availableConnections(); + return (available != 0); // it is -1 (no limit) or has available slots +} + + bool Adaptation::Icap::ServiceRep::wantsUrl(const String &urlPath) const { Must(hasOptions()); return theOptions->transferKind(urlPath) != Adaptation::Icap::Options::xferIgnore; } bool Adaptation::Icap::ServiceRep::wantsPreview(const String &urlPath, size_t &wantedSize) const { Must(hasOptions()); if (theOptions->preview < 0) return false; if (theOptions->transferKind(urlPath) != Adaptation::Icap::Options::xferPreview) return false; wantedSize = theOptions->preview; return true; } @@ -170,40 +338,58 @@ void Adaptation::Icap::ServiceRep::noteTimeToNotify() { Must(!notifying); notifying = true; debugs(93,7, HERE << "notifies " << theClients.size() << " clients " << status()); // note: we must notify even if we are invalidated Pointer us = NULL; while (!theClients.empty()) { Client i = theClients.pop_back(); ScheduleCallHere(i.callback); i.callback = 0; } notifying = false; } +void Adaptation::Icap::ServiceRep::callWhenAvailable(AsyncCall::Pointer &cb, bool priority) +{ + debugs(93,8, "ICAPServiceRep::callWhenAvailable"); + Must(cb!=NULL); + Must(up()); + Must(!theIdleConns.count()); // or we should not be waiting + + Client i; + i.service = Pointer(this); + i.callback = cb; + if (priority) + theNotificationWaiters.push_front(i); + else + theNotificationWaiters.push_back(i); + + busyCheckpoint(); +} + void Adaptation::Icap::ServiceRep::callWhenReady(AsyncCall::Pointer &cb) { Must(cb!=NULL); debugs(93,5, HERE << "Adaptation::Icap::Service is asked to call " << *cb << " when ready " << status()); Must(!broken()); // we do not wait for a broken service Client i; i.service = Pointer(this); // TODO: is this really needed? i.callback = cb; theClients.push_back(i); if (theOptionsFetcher.set() || notifying) return; // do nothing, we will be picked up in noteTimeToNotify() if (needNewOptions()) startGettingOptions(); else @@ -334,40 +520,51 @@ } // we (a) must keep trying to get OPTIONS and (b) are RefCounted so we // must keep our job alive (XXX: until nobody needs us) void Adaptation::Icap::ServiceRep::callException(const std::exception &e) { clearAdaptation(theOptionsFetcher); debugs(93,2, "ICAP probably failed to fetch options (" << e.what() << ")" << status()); handleNewOptions(0); } void Adaptation::Icap::ServiceRep::handleNewOptions(Adaptation::Icap::Options *newOptions) { // new options may be NULL changeOptions(newOptions); debugs(93,3, HERE << "got new options and is now " << status()); scheduleUpdate(optionsFetchTime()); + + setMaxConnections(); + const int excess = excessConnections(); + // if we owe connections and have idle pconns, close the latter + if (excess && theIdleConns.count() > 0) { + const int n = min(excess, theIdleConns.count()); + debugs(93,5, HERE << "closing " << n << " pconns to relief debt"); + Ip::Address anyAddr; + theIdleConns.closeN(n, cfg().host.termedBuf(), cfg().port, NULL, anyAddr); + } + scheduleNotification(); } void Adaptation::Icap::ServiceRep::startGettingOptions() { Must(!theOptionsFetcher); debugs(93,6, HERE << "will get new options " << status()); // XXX: "this" here is "self"; works until refcounting API changes theOptionsFetcher = initiateAdaptation( new Adaptation::Icap::OptXactLauncher(this)); // TODO: timeout in case Adaptation::Icap::OptXact never calls us back? // Such a timeout should probably be a generic AsyncStart feature. } void Adaptation::Icap::ServiceRep::scheduleUpdate(time_t when) { if (updateScheduled) { debugs(93,7, HERE << "reschedules update"); // XXX: check whether the event is there because AR saw @@ -469,20 +666,39 @@ if (const int failures = theSessionFailures.remembered()) buf.Printf(",fail%d", failures); buf.append("]", 1); buf.terminate(); return buf.content(); } void Adaptation::Icap::ServiceRep::detach() { debugs(93,3, HERE << "detaching ICAP service: " << cfg().uri << ' ' << status()); isDetached = true; } bool Adaptation::Icap::ServiceRep::detached() const { return isDetached; } + +Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const CbcPointer &xact, + Adaptation::Icap::ConnWaiterDialer::Parent::Method aHandler): + Parent(xact, aHandler) +{ + theService = &xact->service(); + theService->noteNewWaiter(); +} + +Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter): Parent(aConnWaiter) +{ + theService = aConnWaiter.theService; + theService->noteNewWaiter(); +} + +Adaptation::Icap::ConnWaiterDialer::~ConnWaiterDialer() +{ + theService->noteGoneWaiter(); +} === modified file 'src/adaptation/icap/ServiceRep.h' --- src/adaptation/icap/ServiceRep.h 2011-03-08 23:56:22 +0000 +++ src/adaptation/icap/ServiceRep.h 2011-04-27 11:20:11 +0000 @@ -23,41 +23,44 @@ * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #ifndef SQUID_ICAPSERVICEREP_H #define SQUID_ICAPSERVICEREP_H #include "cbdata.h" #include "FadingCounter.h" #include "adaptation/Service.h" #include "adaptation/forward.h" #include "adaptation/Initiator.h" #include "adaptation/icap/Elements.h" - +#include "base/AsyncJobCalls.h" +#include "comm.h" +#include "pconn.h" +#include namespace Adaptation { namespace Icap { class Options; class OptXact; /* The ICAP service representative maintains information about a single ICAP service that Squid communicates with. The representative initiates OPTIONS requests to the service to keep cached options fresh. One ICAP server may host many ICAP services. */ /* * A service with a fresh cached OPTIONS response and without many failures * is an "up" service. All other services are "down". A service is "probed" * if we tried to get an OPTIONS response from it and succeeded or failed. * A probed down service is called "broken". * @@ -77,100 +80,150 @@ * not try to fetch fresh options for such a service. It should be * auto-destroyed by refcounting when no longer used. */ class ServiceRep : public RefCountable, public Adaptation::Service, public Adaptation::Initiator { public: typedef RefCount Pointer; public: explicit ServiceRep(const ServiceConfigPointer &aConfig); virtual ~ServiceRep(); virtual void finalize(); virtual bool probed() const; // see comments above virtual bool up() const; // see comments above + bool availableForNew() const; ///< a new transaction may start communicating with the service + bool availableForOld() const; ///< a transaction notified about connection slot availability may start communicating with the service virtual Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause); + void callWhenAvailable(AsyncCall::Pointer &cb, bool priority = false); void callWhenReady(AsyncCall::Pointer &cb); // the methods below can only be called on an up() service bool wantsUrl(const String &urlPath) const; bool wantsPreview(const String &urlPath, size_t &wantedSize) const; bool allows204() const; bool allows206() const; + int getConnection(bool isRetriable, bool &isReused); + void putConnection(int fd, bool isReusable, const char *comment); + void noteConnectionUse(int fd); void noteFailure(); // called by transactions to report service failure + void noteNewWaiter() {theAllWaiters++;} ///< New xaction waiting for service to be up or available + void noteGoneWaiter(); ///< An xaction is not waiting any more for service to be available + bool existWaiters() const {return (theAllWaiters > 0);} ///< if there are xactions waiting for the service to be available + //AsyncJob virtual methods virtual bool doneAll() const { return Adaptation::Initiator::doneAll() && false;} virtual void callException(const std::exception &e); virtual void detach(); virtual bool detached() const; public: // treat these as private, they are for callbacks only void noteTimeToUpdate(); void noteTimeToNotify(); // receive either an ICAP OPTIONS response header or an abort message virtual void noteAdaptationAnswer(const Answer &answer); private: // stores Prepare() callback info struct Client { Pointer service; // one for each client to preserve service AsyncCall::Pointer callback; }; typedef Vector Clients; + // TODO: rename to theUpWaiters Clients theClients; // all clients waiting for a call back Options *theOptions; CbcPointer theOptionsFetcher; // pending ICAP OPTIONS transaction time_t theLastUpdate; // time the options were last updated + /// FIFO queue of xactions waiting for a connection slot and not yet notified + /// about it; xaction is removed when notification is scheduled + std::deque theNotificationWaiters; + int theBusyConns; ///< number of connections given to active transactions + /// number of xactions waiting for a connection slot (notified and not) + /// the number is decreased after the xaction receives notification + int theAllWaiters; + int theMaxConnections; ///< the maximum allowed connections to the service + // TODO: use a better type like the FadingCounter for connOverloadReported + mutable bool connOverloadReported; ///< whether we reported exceeding theMaxConnections + PconnPool theIdleConns; ///< idle persistent connection pool + FadingCounter theSessionFailures; const char *isSuspended; // also stores suspension reason for debugging bool notifying; // may be true in any state except for the initial bool updateScheduled; // time-based options update has been scheduled private: ICAP::Method parseMethod(const char *) const; ICAP::VectPoint parseVectPoint(const char *) const; void suspend(const char *reason); bool hasOptions() const; bool needNewOptions() const; time_t optionsFetchTime() const; void scheduleUpdate(time_t when); void scheduleNotification(); void startGettingOptions(); void handleNewOptions(Options *newOptions); void changeOptions(Options *newOptions); void checkOptions(); void announceStatusChange(const char *downPhrase, bool important) const; + /// Set the maximum allowed connections for the service + void setMaxConnections(); + /// The number of connections which excess the Max-Connections limit + int excessConnections() const; + /** + * The available connections slots to the ICAP server + \return the available slots, or -1 if there is no limit on allowed connections + */ + int availableConnections() const; + /** + * If there are xactions waiting for the service to be available, notify + * as many xactions as the available connections slots. + */ + void busyCheckpoint(); + const char *status() const; mutable bool wasAnnouncedUp; // prevent sequential same-state announcements bool isDetached; CBDATA_CLASS2(ServiceRep); }; +class ModXact; +/// Custom dialer to call Service::noteNewWaiter and noteGoneWaiter +/// to maintain Service idea of waiting and being-notified transactions. +class ConnWaiterDialer: public NullaryMemFunT +{ +public: + typedef NullaryMemFunT Parent; + ServiceRep::Pointer theService; + ConnWaiterDialer(const CbcPointer &xact, Parent::Method aHandler); + ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter); + ~ConnWaiterDialer(); +}; } // namespace Icap } // namespace Adaptation #endif /* SQUID_ICAPSERVICEREP_H */ === modified file 'src/adaptation/icap/Xaction.cc' --- src/adaptation/icap/Xaction.cc 2011-04-07 12:42:02 +0000 +++ src/adaptation/icap/Xaction.cc 2011-04-26 14:38:06 +0000 @@ -4,42 +4,40 @@ #include "squid.h" #include "comm.h" #include "comm/Write.h" #include "CommCalls.h" #include "HttpMsg.h" #include "adaptation/icap/Xaction.h" #include "adaptation/icap/Launcher.h" #include "adaptation/icap/Config.h" #include "base/TextException.h" #include "pconn.h" #include "HttpRequest.h" #include "HttpReply.h" #include "ip/tools.h" #include "acl/FilledChecklist.h" #include "icap_log.h" #include "fde.h" #include "SquidTime.h" #include "err_detail_type.h" -static PconnPool *icapPconnPool = new PconnPool("ICAP Servers"); - //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction); Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::ServiceRep::Pointer &aService): AsyncJob(aTypeName), Adaptation::Initiate(aTypeName), icapRequest(NULL), icapReply(NULL), attempts(0), connection(-1), theService(aService), commBuf(NULL), commBufSize(0), commEof(false), reuseConnection(true), isRetriable(true), isRepeatable(true), ignoreLastWrite(false), connector(NULL), reader(NULL), writer(NULL), closer(NULL) { @@ -77,165 +75,148 @@ isRepeatable = false; } void Adaptation::Icap::Xaction::start() { Adaptation::Initiate::start(); readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF); commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize); // make sure maximum readBuf space does not exceed commBuf size Must(static_cast(readBuf.potentialSpaceSize()) <= commBufSize); } // TODO: obey service-specific, OPTIONS-reported connection limit void Adaptation::Icap::Xaction::openConnection() { Ip::Address client_addr; Must(connection < 0); - const Adaptation::Service &s = service(); + Adaptation::Icap::ServiceRep &s = service(); if (!TheConfig.reuse_connections) disableRetries(); // this will also safely drain pconn pool - // TODO: check whether NULL domain is appropriate here - connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable); - if (connection >= 0) { - debugs(93,3, HERE << "reused pconn FD " << connection); + bool wasReused = false; + connection = s.getConnection(isRetriable, wasReused); + if (connection < 0) + dieOnConnectionFailure(); // throws + + if (wasReused) { + // Set comm Close handler + typedef CommCbMemFunT CloseDialer; + closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", + CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); + comm_add_close_handler(connection, closer); // fake the connect callback // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead? typedef CommCbMemFunT Dialer; CbcPointer self(this); Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); dialer.params.fd = connection; dialer.params.flag = COMM_OK; // fake other parameters by copying from the existing connection connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); ScheduleCallHere(connector); return; } disableRetries(); // we only retry pconn failures - Ip::Address outgoing; - if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) { - debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address."); - dieOnConnectionFailure(); // throws - } - /* split-stack for now requires default IPv4-only socket */ - if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) { - outgoing.SetIPv4(); - } - connection = comm_open(SOCK_STREAM, 0, outgoing, - COMM_NONBLOCKING, s.cfg().uri.termedBuf()); - - if (connection < 0) - dieOnConnectionFailure(); // throws - - debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port); + debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port); // TODO: service bypass status may differ from that of a transaction typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(93, 5, - TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout); + AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", + TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); + commSetTimeout(connection, TheConfig.connect_timeout( service().cfg().bypass), timeoutCall); typedef CommCbMemFunT CloseDialer; - closer = JobCallback(93, 5, - CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed); + closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", + CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); comm_add_close_handler(connection, closer); typedef CommCbMemFunT ConnectDialer; - connector = JobCallback(93,3, - ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); + connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", + ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected)); commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector); } /* * This event handler is necessary to work around the no-rentry policy * of Adaptation::Icap::Xaction::callStart() */ #if 0 void Adaptation::Icap::Xaction::reusedConnection(void *data) { debugs(93, 5, HERE << "reused connection"); Adaptation::Icap::Xaction *x = (Adaptation::Icap::Xaction*)data; x->noteCommConnected(COMM_OK); } #endif void Adaptation::Icap::Xaction::closeConnection() { if (connection >= 0) { if (closer != NULL) { comm_remove_close_handler(connection, closer); closer = NULL; } cancelRead(); // may not work if (reuseConnection && !doneWithIo()) { //status() adds leading spaces. debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status()); reuseConnection = false; } - if (reuseConnection) { - Ip::Address client_addr; - //status() adds leading spaces. - debugs(93,3, HERE << "pushing pconn" << status()); - AsyncCall::Pointer call = NULL; - commSetTimeout(connection, -1, call); - icapPconnPool->push(connection, theService->cfg().host.termedBuf(), - theService->cfg().port, NULL, client_addr); + if (reuseConnection) disableRetries(); - } else { - //status() adds leading spaces. - debugs(93,3, HERE << "closing pconn" << status()); - // comm_close will clear timeout - comm_close(connection); - } + + Adaptation::Icap::ServiceRep &s = service(); + s.putConnection(connection, reuseConnection, status()); writer = NULL; reader = NULL; connector = NULL; connection = -1; } } // connection with the ICAP service established void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io) { Must(connector != NULL); connector = NULL; if (io.flag != COMM_OK) dieOnConnectionFailure(); // throws - fd_table[connection].noteUse(icapPconnPool); + service().noteConnectionUse(connection); handleCommConnected(); } void Adaptation::Icap::Xaction::dieOnConnectionFailure() { debugs(93, 2, HERE << typeName << " failed to connect to " << service().cfg().uri); detailError(ERR_DETAIL_ICAP_XACT_START); throw TexcHere("cannot connect to the ICAP service"); } void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf) { // comm module will free the buffer typedef CommCbMemFunT Dialer; writer = JobCallback(93,3, Dialer, this, Adaptation::Icap::Xaction::noteCommWrote); Comm::Write(connection, &buf, writer); === modified file 'src/adaptation/icap/Xaction.h' --- src/adaptation/icap/Xaction.h 2010-10-21 08:13:41 +0000 +++ src/adaptation/icap/Xaction.h 2011-04-26 14:31:14 +0000 @@ -121,40 +121,41 @@ virtual void swanSong(); // returns a temporary string depicting transaction status, for debugging virtual const char *status() const; virtual void fillPendingStatus(MemBuf &buf) const; virtual void fillDoneStatus(MemBuf &buf) const; // useful for debugging virtual bool fillVirginHttpHeader(MemBuf&) const; public: // custom exception handling and end-of-call checks virtual void callException(const std::exception &e); virtual void callEnd(); protected: // logging void setOutcome(const XactOutcome &xo); virtual void finalizeLogInfo(); +public: ServiceRep &service(); private: void tellQueryAborted(); void maybeLog(); protected: int connection; // FD of the ICAP server connection Adaptation::Icap::ServiceRep::Pointer theService; /* * We have two read buffers. We would prefer to read directly * into the MemBuf, but since comm_read isn't MemBuf-aware, and * uses event-delayed callbacks, it leaves the MemBuf in an * inconsistent state. There would be data in the buffer, but * MemBuf.size won't be updated until the (delayed) callback * occurs. To avoid that situation we use a plain buffer * (commBuf) and then copy (append) its contents to readBuf in * the callback. If comm_read ever becomes MemBuf-aware, we * can eliminate commBuf and this extra buffer copy. === modified file 'src/cf.data.pre' --- src/cf.data.pre 2011-04-20 07:11:27 +0000 +++ src/cf.data.pre 2011-04-27 10:11:43 +0000 @@ -6529,40 +6529,60 @@ routing=on|off|1|0 If set to 'on' or '1', the ICAP service is allowed to dynamically change the current message adaptation plan by returning a chain of services to be used next. The services are specified using the X-Next-Services ICAP response header value, formatted as a comma-separated list of service names. Each named service should be configured in squid.conf and should have the same method and vectoring point as the current ICAP transaction. Services violating these rules are ignored. An empty X-Next-Services value results in an empty plan which ends the current adaptation. Routing is not allowed by default: the ICAP X-Next-Services response header is ignored. ipv6=on|off Only has effect on split-stack systems. The default on those systems is to use IPv4-only connections. When set to 'on' this option will make Squid use IPv6-only connections to contact this ICAP service. + on-overload=block|bypass|wait|force + If the service Max-Connections limit has been reached, do + one of the following for each new ICAP transaction: + * block: send an HTTP error response to the client + * bypass: ignore the "over-connected" ICAP service + * wait: wait (in a FIFO queue) for an ICAP connection slot + * force: proceed, ignoring the Max-Connections limit + + In SMP mode with N workers, each worker assumes the service + connection limit is Max-Connections/N, even though not all + workers may use a given service. + + The default value is "bypass" if service is bypassable, + otherwise it is set to "wait". + + + max-conn=number + Use the given number as the Max-Connections limit, regardless + of the Max-Connections value given by the service, if any. + Older icap_service format without optional named parameters is deprecated but supported for backward compatibility. Example: icap_service svcBlocker reqmod_precache bypass=0 icap://icap1.mydomain.net:1344/reqmod icap_service svcLogger reqmod_precache routing=on icap://icap2.mydomain.net:1344/respmod DOC_END NAME: icap_class TYPE: icap_class_type IFDEF: ICAP_CLIENT LOC: none DEFAULT: none DOC_START This deprecated option was documented to define an ICAP service chain, even though it actually defined a set of similar, redundant services, and the chains were not supported. To define a set of redundant services, please use the adaptation_service_set directive. For service chains, use === modified file 'src/comm.cc' --- src/comm.cc 2011-01-28 07:58:53 +0000 +++ src/comm.cc 2011-03-23 15:55:49 +0000 @@ -1473,41 +1473,41 @@ COMMIO_FD_WRITECB(fd)->finish(COMM_ERR_CLOSING, errno); } if (COMMIO_FD_READCB(fd)->active()) { Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); COMMIO_FD_READCB(fd)->finish(COMM_ERR_CLOSING, errno); } #if USE_DELAY_POOLS if (ClientInfo *clientInfo = F->clientInfo) { if (clientInfo->selectWaiting) { clientInfo->selectWaiting = false; // kick queue or it will get stuck as commWriteHandle is not called clientInfo->kickQuotaQueue(); } } #endif commCallCloseHandlers(fd); if (F->pconn.uses) - F->pconn.pool->count(F->pconn.uses); + F->pconn.pool->noteUses(F->pconn.uses); comm_empty_os_read_buffers(fd); AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete", CommCloseCbPtrFun(comm_close_complete, NULL)); Params &completeParams = GetCommParams(completeCall); completeParams.fd = fd; // must use async call to wait for all callbacks // scheduled before comm_close() to finish ScheduleCallHere(completeCall); PROF_stop(comm_close); } /* Send a udp datagram to specified TO_ADDR. */ int comm_udp_sendto(int fd, const Ip::Address &to_addr, const void *buf, === modified file 'src/pconn.cc' --- src/pconn.cc 2010-12-14 14:01:14 +0000 +++ src/pconn.cc 2011-04-26 14:38:06 +0000 @@ -77,69 +77,75 @@ if (fds[index] == fd) return index; } return -1; } void IdleConnList::removeFD(int fd) { int index = findFDIndex(fd); if (index < 0) { debugs(48, 2, "IdleConnList::removeFD: FD " << fd << " NOT FOUND!"); return; } debugs(48, 3, "IdleConnList::removeFD: found FD " << fd << " at index " << index); for (; index < nfds - 1; index++) fds[index] = fds[index + 1]; + if (parent) + parent->noteConnectionRemoved(); + if (--nfds == 0) { debugs(48, 3, "IdleConnList::removeFD: deleting " << hashKeyStr(&hash)); delete this; } } void IdleConnList::clearHandlers(int fd) { comm_read_cancel(fd, IdleConnList::read, this); commSetTimeout(fd, -1, NULL, NULL); } void IdleConnList::push(int fd) { if (nfds == nfds_alloc) { debugs(48, 3, "IdleConnList::push: growing FD array"); nfds_alloc <<= 1; int *old = fds; fds = (int *)xmalloc(nfds_alloc * sizeof(int)); memcpy(fds, old, nfds * sizeof(int)); if (nfds == PCONN_FDS_SZ) pconn_fds_pool->freeOne(old); else xfree(old); } + if (parent) + parent->noteConnectionAdded(); + fds[nfds++] = fd; comm_read(fd, fakeReadBuf, sizeof(fakeReadBuf), IdleConnList::read, this); commSetTimeout(fd, Config.Timeout.pconn, IdleConnList::timeout, this); } /* * XXX this routine isn't terribly efficient - if there's a pending * read event (which signifies the fd will close in the next IO loop!) * we ignore the FD and move onto the next one. This means, as an example, * if we have a lot of FDs open to a very popular server and we get a bunch * of requests JUST as they timeout (say, it shuts down) we'll be wasting * quite a bit of CPU. Just keep it in mind. */ int IdleConnList::findUseableFD() { assert(nfds); for (int i=nfds-1; i>=0; i--) { if (!comm_has_pending_read_callback(fds[i])) { @@ -213,41 +219,42 @@ storeAppendPrintf(e, "\t%4d %9d\n", i, hist[i]); } } void PconnPool::dumpHash(StoreEntry *e) { int i; hash_link *walker = NULL; hash_table *hid = table; hash_first(hid); for (i = 0, walker = hid->next; walker; walker = hash_next(hid)) { storeAppendPrintf(e, "\t item %5d: %s\n", i++, (char *)(walker->key)); } } /* ========== PconnPool PUBLIC FUNCTIONS ============================================ */ -PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr) +PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr), + theCount(0) { int i; table = hash_create((HASHCMP *) strcmp, 229, hash_string); for (i = 0; i < PCONN_HIST_SZ; i++) hist[i] = 0; PconnModule::GetInstance()->add(this); } PconnPool::~PconnPool() { descr = NULL; hashFreeMemory(table); } void PconnPool::push(int fd, const char *host, u_short port, const char *domain, Ip::Address &client_address) { IdleConnList *list; @@ -274,77 +281,88 @@ hash_join(table, &list->hash); } else { debugs(48, 3, "PconnPool::push: found IdleConnList for {" << hashKeyStr(&list->hash) << "}" ); } list->push(fd); assert(!comm_has_incomplete_write(fd)); snprintf(desc, FD_DESC_SZ, "%s idle connection", host); fd_note(fd, desc); debugs(48, 3, "PconnPool::push: pushed FD " << fd << " for " << aKey); } /** * Return a pconn fd for host:port if available and retriable. * Otherwise, return -1. * * We close available persistent connection if the caller transaction is not * retriable to avoid having a growing number of open connections when many * transactions create persistent connections but are not retriable. + * PconnPool::closeN() relies on that behavior as well. */ int PconnPool::pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool isRetriable) { const char * aKey = key(host, port, domain, client_address); IdleConnList *list = (IdleConnList *)hash_lookup(table, aKey); if (list == NULL) { debugs(48, 3, "PconnPool::pop: lookup for key {" << aKey << "} failed."); return -1; } else { debugs(48, 3, "PconnPool::pop: found " << hashKeyStr(&list->hash) << (isRetriable?"(to use)":"(to kill)") ); } int fd = list->findUseableFD(); // search from the end. skip pending reads. if (fd >= 0) { list->clearHandlers(fd); list->removeFD(fd); /* might delete list */ if (!isRetriable) { comm_close(fd); return -1; } } return fd; } void -PconnPool::unlinkList(IdleConnList *list) const +PconnPool::closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address) +{ + // TODO: optimize: we can probably do hash_lookup just once + for (int i = 0; i < n; ++i) + pop(host, port, domain, client_address, false); // may fail! +} + +void +PconnPool::unlinkList(IdleConnList *list) { + theCount -= list->count(); + assert(theCount >= 0); hash_remove_link(table, &list->hash); } void -PconnPool::count(int uses) +PconnPool::noteUses(int uses) { if (uses >= PCONN_HIST_SZ) uses = PCONN_HIST_SZ - 1; hist[uses]++; } /* ========== PconnModule ============================================ */ /* * This simple class exists only for the cache manager */ PconnModule::PconnModule() : pools(NULL), poolCount(0) { pools = (PconnPool **) xcalloc(MAX_NUM_PCONN_POOLS, sizeof(*pools)); pconn_fds_pool = memPoolCreate("pconn_fds", PCONN_FDS_SZ * sizeof(int)); debugs(48, 0, "persistent connection module initialized"); registerWithCacheManager(); } === modified file 'src/pconn.h' --- src/pconn.h 2010-10-07 06:34:34 +0000 +++ src/pconn.h 2011-04-26 14:38:06 +0000 @@ -15,48 +15,49 @@ /* for CBDATA_CLASS2() macros */ #include "cbdata.h" /* for hash_link */ #include "hash.h" /* for IOCB */ #include "comm.h" /// \ingroup PConnAPI #define MAX_NUM_PCONN_POOLS 10 /// \ingroup PConnAPI #define PCONN_HIST_SZ (1<<16) /// \ingroup PConnAPI class IdleConnList { public: IdleConnList(const char *key, PconnPool *parent); ~IdleConnList(); - int numIdle() { return nfds; } int findFDIndex(int fd); ///< search from the end of array void removeFD(int fd); void push(int fd); int findUseableFD(); ///< find first from the end not pending read fd. void clearHandlers(int fd); + int count() const { return nfds; } + private: static IOCB read; static PF timeout; public: hash_link hash; /** must be first */ private: int *fds; int nfds_alloc; int nfds; PconnPool *parent; char fakeReadBuf[4096]; CBDATA_CLASS2(IdleConnList); }; #include "ip/forward.h" class StoreEntry; @@ -65,53 +66,57 @@ /* for hash_table */ #include "hash.h" /** \ingroup PConnAPI * Manages idle persistent connections to a caller-defined set of * servers (e.g., all HTTP servers). Uses a collection of IdleConnLists * internally to list the individual open connections to each server. * Controls lists existence and limits the total number of * idle connections across the collection. */ class PconnPool { public: PconnPool(const char *); ~PconnPool(); void moduleInit(); void push(int fd, const char *host, u_short port, const char *domain, Ip::Address &client_address); int pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool retriable); - void count(int uses); + void noteUses(int uses); void dumpHist(StoreEntry *e); void dumpHash(StoreEntry *e); - void unlinkList(IdleConnList *list) const; + void unlinkList(IdleConnList *list); + void closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address); + int count() const { return theCount; } + void noteConnectionAdded() { ++theCount; } + void noteConnectionRemoved() { assert(theCount > 0); --theCount; } private: static const char *key(const char *host, u_short port, const char *domain, Ip::Address &client_address); int hist[PCONN_HIST_SZ]; hash_table *table; const char *descr; - + int theCount; ///< the number of pooled connections }; class StoreEntry; class PconnPool; /// \ingroup PConnAPI class PconnModule { public: /** the module is a singleton until we have instance based cachemanager * management */ static PconnModule * GetInstance(); /** A thunk to the still C like CacheManager callback api. */ static void DumpWrapper(StoreEntry *e); PconnModule(); void registerWithCacheManager(void);