This patch implements the phase 1 of the ICAP Max-Connections feature as it is described in squid wiki: http://wiki.squid-cache.org/Features/ServiceOverload The behaviour of the patch can be configured using on_overload and max_conn options of the icap_service configuration parameter. Squid can be configured to do one of the following: - Block: send and HTTP error response to the subscriber - Bypass: ignore the "over-connected" ICAP service - Wait: wait (in a FIFO queue) for an ICAP connection slot - Force: proceed, ignoring the Max-Connections limit Squid warns the first time the service become overloaded For more information please visit the feature wiki page given above. Technical informations: The patch starts count a connections to the ICAP server as active when the ModXact class receives an FD even if the fd is not really connected to the server yet, and decrease the active connections to the server when the ModXact object releases its fd connection. If the Max-Connection limit is reached squid puts the request to a waiters list. When one or more connections released squid schedules one or more waiters for execution and remove them from waiters list. To handle cases where a waiter gone/canceled before its execution the custom dialer ConnWaiterDialer used. The Options connections counted as active connections but are not limited by the Max-Connections limit. An Option request will be executed even if the maximum connections number is reached. === modified file 'src/adaptation/Elements.h' --- src/adaptation/Elements.h 2008-03-24 04:40:39 +0000 +++ src/adaptation/Elements.h 2011-04-07 14:40:53 +0000 @@ -8,6 +8,7 @@ typedef enum { methodNone, methodReqmod, methodRespmod, methodOptions } Method; typedef enum { pointNone, pointPreCache, pointPostCache } VectPoint; +typedef enum { srvBlock, srvBypass, srvWait, srvForce} SrvBehaviour; extern const char *crlf; extern const char *methodStr(Method); // TODO: make into a stream operator? === modified file 'src/adaptation/ServiceConfig.cc' --- src/adaptation/ServiceConfig.cc 2011-03-08 23:56:22 +0000 +++ src/adaptation/ServiceConfig.cc 2011-05-09 08:34:21 +0000 @@ -9,7 +9,8 @@ Adaptation::ServiceConfig::ServiceConfig(): port(-1), method(methodNone), point(pointNone), - bypass(false), routing(false), ipv6(false) + bypass(false), maxConn(-1), onOverload(srvWait), + routing(false), ipv6(false) {} const char * @@ -70,6 +71,7 @@ // handle optional service name=value parameters const char *lastOption = NULL; bool grokkedUri = false; + bool onOverloadSet = false; while (char *option = strtok(NULL, w_space)) { if (strcmp(option, "0") == 0) { // backward compatibility bypass = false; @@ -91,9 +93,9 @@ // TODO: warn if option is set twice? bool grokked = false; - if (strcmp(name, "bypass") == 0) + if (strcmp(name, "bypass") == 0) { grokked = grokBool(bypass, name, value); - else if (strcmp(name, "routing") == 0) + } else if (strcmp(name, "routing") == 0) grokked = grokBool(routing, name, value); else if (strcmp(name, "uri") == 0) grokked = grokkedUri = grokUri(value); @@ -101,6 +103,11 @@ grokked = grokBool(ipv6, name, value); if (grokked && ipv6 && !Ip::EnableIpv6) debugs(3, DBG_IMPORTANT, "WARNING: IPv6 is disabled. ICAP service option ignored."); + } else if (strcmp(name, "max-conn") == 0) + grokked = grokLong(maxConn, name, value); + else if (strcmp(name, "on-overload") == 0) { + grokked = grokOnOverload(onOverload, value); + onOverloadSet = true; } else grokked = grokExtension(name, value); @@ -108,6 +115,10 @@ return false; } + // set default on-overload value if needed + if (!onOverloadSet) + onOverload = bypass ? srvBypass : srvWait; + // what is left must be the service URI if (!grokkedUri && !grokUri(lastOption)) return false; @@ -247,6 +258,41 @@ } bool +Adaptation::ServiceConfig::grokLong(long &var, const char *name, const char *value) +{ + char *bad = NULL; + const long p = strtol(value, &bad, 0); + if (p < 0 || bad == value) { + debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' << + config_lineno << ": " << "wrong value for " << name << "; " << + "a non-negative integer expected but got: " << value); + return false; + } + var = p; + return true; +} + +bool +Adaptation::ServiceConfig::grokOnOverload(SrvBehaviour &var, const char *value) +{ + if (strcmp(value, "block") == 0) + var = srvBlock; + else if (strcmp(value, "bypass") == 0) + var = srvBypass; + else if (strcmp(value, "wait") == 0) + var = srvWait; + else if (strcmp(value, "force") == 0) + var = srvForce; + else { + debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' << + config_lineno << ": " << "wrong value for on-overload; " << + "'block', 'bypass', 'wait' or 'force' expected but got: " << value); + return false; + } + return true; +} + +bool Adaptation::ServiceConfig::grokExtension(const char *name, const char *value) { // we do not accept extensions by default === modified file 'src/adaptation/ServiceConfig.h' --- src/adaptation/ServiceConfig.h 2010-12-18 00:31:53 +0000 +++ src/adaptation/ServiceConfig.h 2011-04-27 10:24:19 +0000 @@ -32,6 +32,10 @@ Method method; // what is being adapted (REQMOD vs RESPMOD) VectPoint point; // where the adaptation happens (pre- or post-cache) bool bypass; + + // options + long maxConn; ///< maximum number of concurrent service transactions + SrvBehaviour onOverload; ///< how to handle Max-Connections feature bool routing; ///< whether this service may determine the next service(s) bool ipv6; ///< whether this service uses IPv6 transport (default IPv4) @@ -42,6 +46,9 @@ /// interpret parsed values bool grokBool(bool &var, const char *name, const char *value); bool grokUri(const char *value); + bool grokLong(long &var, const char *name, const char *value); + /// handle on-overload configuration option + bool grokOnOverload(SrvBehaviour &var, const char *value); /// handle name=value configuration option with name unknown to Squid virtual bool grokExtension(const char *name, const char *value); }; === modified file 'src/adaptation/icap/ModXact.cc' --- src/adaptation/icap/ModXact.cc 2011-05-02 01:14:30 +0000 +++ src/adaptation/icap/ModXact.cc 2011-05-09 08:54:55 +0000 @@ -86,8 +86,8 @@ canStartBypass = service().cfg().bypass; // it is an ICAP violation to send request to a service w/o known OPTIONS - - if (service().up()) + // and the service may is too busy for us: honor Max-Connections and such + if (service().up() && service().availableForNew()) startWriting(); else waitForService(); @@ -95,13 +95,44 @@ void Adaptation::Icap::ModXact::waitForService() { + const char *comment; Must(!state.serviceWaiting); - debugs(93, 7, HERE << "will wait for the ICAP service" << status()); - typedef NullaryMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(93,5, - Dialer, this, Adaptation::Icap::ModXact::noteServiceReady); - service().callWhenReady(call); + + if (!service().up()) { + AsyncCall::Pointer call = JobCallback(93,5, + ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceReady); + + service().callWhenReady(call); + comment = "to be up"; + } else { + //The service is unavailable because of max-connection or other reason + + if (service().cfg().onOverload != srvWait) { + // The service is overloaded, but waiting to be available prohibited by + // user configuration (onOverload is set to "block" or "bypass") + if (service().cfg().onOverload == srvBlock) + disableBypass("not available", true); + else //if (service().cfg().onOverload == srvBypass) + canStartBypass = true; + + disableRetries(); + disableRepeats("ICAP service is not available"); + + debugs(93, 7, HERE << "will not wait for the service to be available" << + status()); + + throw TexcHere("ICAP service is not available"); + } + + AsyncCall::Pointer call = JobCallback(93,5, + ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceAvailable); + service().callWhenAvailable(call, state.waitedForService); + comment = "to be available"; + } + + debugs(93, 7, HERE << "will wait for the service " << comment << status()); state.serviceWaiting = true; // after callWhenReady() which may throw + state.waitedForService = true; } void Adaptation::Icap::ModXact::noteServiceReady() @@ -109,13 +140,27 @@ Must(state.serviceWaiting); state.serviceWaiting = false; - if (service().up()) { - startWriting(); - } else { + if (!service().up()) { disableRetries(); disableRepeats("ICAP service is unusable"); throw TexcHere("ICAP service is unusable"); } + + if (service().availableForOld()) + startWriting(); + else + waitForService(); +} + +void Adaptation::Icap::ModXact::noteServiceAvailable() +{ + Must(state.serviceWaiting); + state.serviceWaiting = false; + + if (service().up() && service().availableForOld()) + startWriting(); + else + waitForService(); } void Adaptation::Icap::ModXact::startWriting() === modified file 'src/adaptation/icap/ModXact.h' --- src/adaptation/icap/ModXact.h 2010-10-21 08:13:41 +0000 +++ src/adaptation/icap/ModXact.h 2011-04-26 14:38:06 +0000 @@ -157,6 +157,7 @@ // service waiting void noteServiceReady(); + void noteServiceAvailable(); public: InOut virgin; @@ -303,6 +304,7 @@ bool allowedPostview206; // must handle 206 Partial Content outside preview bool allowedPreview206; // must handle 206 Partial Content inside preview bool readyForUob; ///< got a 206 response and expect a use-origin-body + bool waitedForService; ///< true if was queued at least once // will not write anything [else] to the ICAP server connection bool doneWriting() const { return writing == writingReallyDone; } === modified file 'src/adaptation/icap/Options.cc' --- src/adaptation/icap/Options.cc 2010-10-13 00:14:42 +0000 +++ src/adaptation/icap/Options.cc 2011-04-26 14:31:14 +0000 @@ -98,6 +98,8 @@ } cfgIntHeader(h, "Max-Connections", max_connections); + if (max_connections == 0) + debugs(93, DBG_IMPORTANT, "WARNING: Max-Connections is set to zero! "); cfgIntHeader(h, "Options-TTL", theTTL); === modified file 'src/adaptation/icap/ServiceRep.cc' --- src/adaptation/icap/ServiceRep.cc 2011-03-11 23:02:23 +0000 +++ src/adaptation/icap/ServiceRep.cc 2011-05-09 09:33:35 +0000 @@ -11,19 +11,27 @@ #include "adaptation/icap/ServiceRep.h" #include "base/TextException.h" #include "ConfigParser.h" +#include "ip/tools.h" #include "HttpReply.h" #include "SquidTime.h" +#include "fde.h" CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, ServiceRep); Adaptation::Icap::ServiceRep::ServiceRep(const ServiceConfigPointer &svcCfg): AsyncJob("Adaptation::Icap::ServiceRep"), Adaptation::Service(svcCfg), theOptions(NULL), theOptionsFetcher(0), theLastUpdate(0), + theBusyConns(0), + theAllWaiters(0), + connOverloadReported(false), + theIdleConns("ICAP Service"), isSuspended(0), notifying(false), updateScheduled(false), wasAnnouncedUp(true), // do not announce an "up" service at startup isDetached(false) -{} +{ + setMaxConnections(); +} Adaptation::Icap::ServiceRep::~ServiceRep() { @@ -72,6 +80,157 @@ // should be configurable. } +// returns a persistent or brand new connection; negative int on failures +int Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) +{ + Ip::Address client_addr; + + int connection = theIdleConns.pop(cfg().host.termedBuf(), cfg().port, NULL, client_addr, + retriableXact); + + reused = connection >= 0; // reused a persistent connection + + if (!reused) { // need a new connection + Ip::Address outgoing; // default: IP6_ANY_ADDR + if (!Ip::EnableIpv6) + outgoing.SetIPv4(); + else if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && !cfg().ipv6) { + /* split-stack for now requires default IPv4-only socket */ + outgoing.SetIPv4(); + } + connection = comm_open(SOCK_STREAM, 0, outgoing, COMM_NONBLOCKING, cfg().uri.termedBuf()); + } + + if (connection >= 0) + ++theBusyConns; + + return connection; +} + +// pools connection if it is reusable or closes it +void Adaptation::Icap::ServiceRep::putConnection(int fd, bool isReusable, const char *comment) +{ + Must(fd >= 0); + // do not pool an idle connection if we owe connections + if (isReusable && excessConnections() == 0) { + debugs(93, 3, HERE << "pushing pconn" << comment); + commSetTimeout(fd, -1, NULL, NULL); + Ip::Address anyAddr; + theIdleConns.push(fd, cfg().host.termedBuf(), cfg().port, NULL, anyAddr); + } else { + debugs(93, 3, HERE << "closing pconn" << comment); + // comm_close will clear timeout + comm_close(fd); + } + + Must(theBusyConns > 0); + --theBusyConns; + // a connection slot released. Check if there are waiters.... + busyCheckpoint(); +} + +// a wrapper to avoid exposing theIdleConns +void Adaptation::Icap::ServiceRep::noteConnectionUse(int fd) +{ + Must(fd >= 0); + fd_table[fd].noteUse(&theIdleConns); +} + +void Adaptation::Icap::ServiceRep::setMaxConnections() +{ + if (cfg().maxConn >= 0) + theMaxConnections = cfg().maxConn; + else if (theOptions && theOptions->max_connections >= 0) + theMaxConnections = theOptions->max_connections; + else { + theMaxConnections = -1; + return; + } + + if (::Config.workers > 1 ) + theMaxConnections /= ::Config.workers; +} + +int Adaptation::Icap::ServiceRep::availableConnections() const +{ + if (theMaxConnections < 0) + return -1; + + // we are available if we can open or reuse connections + // in other words, if we will not create debt + int available = max(0, theMaxConnections - theBusyConns); + + if (!available && !connOverloadReported) { + debugs(93, DBG_IMPORTANT, "WARNING: ICAP Max-Connections limit " << + "exceeded for service " << cfg().uri << ". Open connections now: " << + theBusyConns + theIdleConns.count() << ", including " << + theIdleConns.count() << " idle persistent connections."); + connOverloadReported = true; + } + + if (cfg().onOverload == srvForce) + return -1; + + return available; +} + +// The number of connections which excess the Max-Connections limit +int Adaptation::Icap::ServiceRep::excessConnections() const +{ + if (theMaxConnections < 0) + return 0; + + // Waiters affect the number of needed connections but a needed + // connection may still be excessive from Max-Connections p.o.v. + // so we should not account for waiting transaction needs here. + const int debt = theBusyConns + theIdleConns.count() - theMaxConnections; + if (debt > 0) + return debt; + else + return 0; +} + +void Adaptation::Icap::ServiceRep::noteGoneWaiter() +{ + theAllWaiters--; + + // in case the notified transaction did not take the connection slot + busyCheckpoint(); +} + +// called when a connection slot may become available +void Adaptation::Icap::ServiceRep::busyCheckpoint() +{ + if (theNotificationWaiters.empty()) // nobody is waiting for a slot + return; + + int freed = 0; + int available = availableConnections(); + + if (available < 0) { + // It is possible to have waiters when no limit on connections exist in + // case of reconfigure or because new Options received. + // In this case, notify all waiting transactions. + freed = theNotificationWaiters.size(); + } else { + // avoid notifying more waiters than there will be available slots + const int notifiedWaiters = theAllWaiters - theNotificationWaiters.size(); + freed = available - notifiedWaiters; + } + + debugs(93,7, HERE << "Available connections: " << available << + " freed slots: " << freed << + " waiting in queue: " << theNotificationWaiters.size()); + + while (freed > 0 && !theNotificationWaiters.empty()) { + Client i = theNotificationWaiters.front(); + theNotificationWaiters.pop_front(); + ScheduleCallHere(i.callback); + i.callback = NULL; + --freed; + } +} + void Adaptation::Icap::ServiceRep::suspend(const char *reason) { if (isSuspended) { @@ -99,6 +258,25 @@ return !isSuspended && hasOptions(); } +bool Adaptation::Icap::ServiceRep::availableForNew() const +{ + Must(up()); + int available = availableConnections(); + if (available < 0) + return true; + else + return (available - theAllWaiters > 0); +} + +bool Adaptation::Icap::ServiceRep::availableForOld() const +{ + Must(up()); + + int available = availableConnections(); + return (available != 0); // it is -1 (no limit) or has available slots +} + + bool Adaptation::Icap::ServiceRep::wantsUrl(const String &urlPath) const { Must(hasOptions()); @@ -187,6 +365,24 @@ notifying = false; } +void Adaptation::Icap::ServiceRep::callWhenAvailable(AsyncCall::Pointer &cb, bool priority) +{ + debugs(93,8, "ICAPServiceRep::callWhenAvailable"); + Must(cb!=NULL); + Must(up()); + Must(!theIdleConns.count()); // or we should not be waiting + + Client i; + i.service = Pointer(this); + i.callback = cb; + if (priority) + theNotificationWaiters.push_front(i); + else + theNotificationWaiters.push_back(i); + + busyCheckpoint(); +} + void Adaptation::Icap::ServiceRep::callWhenReady(AsyncCall::Pointer &cb) { Must(cb!=NULL); @@ -351,6 +547,17 @@ debugs(93,3, HERE << "got new options and is now " << status()); scheduleUpdate(optionsFetchTime()); + + setMaxConnections(); + const int excess = excessConnections(); + // if we owe connections and have idle pconns, close the latter + if (excess && theIdleConns.count() > 0) { + const int n = min(excess, theIdleConns.count()); + debugs(93,5, HERE << "closing " << n << " pconns to relief debt"); + Ip::Address anyAddr; + theIdleConns.closeN(n, cfg().host.termedBuf(), cfg().port, NULL, anyAddr); + } + scheduleNotification(); } @@ -486,3 +693,22 @@ { return isDetached; } + +Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const CbcPointer &xact, + Adaptation::Icap::ConnWaiterDialer::Parent::Method aHandler): + Parent(xact, aHandler) +{ + theService = &xact->service(); + theService->noteNewWaiter(); +} + +Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter): Parent(aConnWaiter) +{ + theService = aConnWaiter.theService; + theService->noteNewWaiter(); +} + +Adaptation::Icap::ConnWaiterDialer::~ConnWaiterDialer() +{ + theService->noteGoneWaiter(); +} === modified file 'src/adaptation/icap/ServiceRep.h' --- src/adaptation/icap/ServiceRep.h 2011-03-08 23:56:22 +0000 +++ src/adaptation/icap/ServiceRep.h 2011-04-27 11:20:11 +0000 @@ -40,7 +40,10 @@ #include "adaptation/forward.h" #include "adaptation/Initiator.h" #include "adaptation/icap/Elements.h" - +#include "base/AsyncJobCalls.h" +#include "comm.h" +#include "pconn.h" +#include namespace Adaptation { @@ -94,9 +97,12 @@ virtual bool probed() const; // see comments above virtual bool up() const; // see comments above + bool availableForNew() const; ///< a new transaction may start communicating with the service + bool availableForOld() const; ///< a transaction notified about connection slot availability may start communicating with the service virtual Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause); + void callWhenAvailable(AsyncCall::Pointer &cb, bool priority = false); void callWhenReady(AsyncCall::Pointer &cb); // the methods below can only be called on an up() service @@ -104,9 +110,16 @@ bool wantsPreview(const String &urlPath, size_t &wantedSize) const; bool allows204() const; bool allows206() const; + int getConnection(bool isRetriable, bool &isReused); + void putConnection(int fd, bool isReusable, const char *comment); + void noteConnectionUse(int fd); void noteFailure(); // called by transactions to report service failure + void noteNewWaiter() {theAllWaiters++;} ///< New xaction waiting for service to be up or available + void noteGoneWaiter(); ///< An xaction is not waiting any more for service to be available + bool existWaiters() const {return (theAllWaiters > 0);} ///< if there are xactions waiting for the service to be available + //AsyncJob virtual methods virtual bool doneAll() const { return Adaptation::Initiator::doneAll() && false;} virtual void callException(const std::exception &e); @@ -130,12 +143,25 @@ }; typedef Vector Clients; + // TODO: rename to theUpWaiters Clients theClients; // all clients waiting for a call back Options *theOptions; CbcPointer theOptionsFetcher; // pending ICAP OPTIONS transaction time_t theLastUpdate; // time the options were last updated + /// FIFO queue of xactions waiting for a connection slot and not yet notified + /// about it; xaction is removed when notification is scheduled + std::deque theNotificationWaiters; + int theBusyConns; ///< number of connections given to active transactions + /// number of xactions waiting for a connection slot (notified and not) + /// the number is decreased after the xaction receives notification + int theAllWaiters; + int theMaxConnections; ///< the maximum allowed connections to the service + // TODO: use a better type like the FadingCounter for connOverloadReported + mutable bool connOverloadReported; ///< whether we reported exceeding theMaxConnections + PconnPool theIdleConns; ///< idle persistent connection pool + FadingCounter theSessionFailures; const char *isSuspended; // also stores suspension reason for debugging @@ -162,6 +188,21 @@ void announceStatusChange(const char *downPhrase, bool important) const; + /// Set the maximum allowed connections for the service + void setMaxConnections(); + /// The number of connections which excess the Max-Connections limit + int excessConnections() const; + /** + * The available connections slots to the ICAP server + \return the available slots, or -1 if there is no limit on allowed connections + */ + int availableConnections() const; + /** + * If there are xactions waiting for the service to be available, notify + * as many xactions as the available connections slots. + */ + void busyCheckpoint(); + const char *status() const; mutable bool wasAnnouncedUp; // prevent sequential same-state announcements @@ -169,6 +210,18 @@ CBDATA_CLASS2(ServiceRep); }; +class ModXact; +/// Custom dialer to call Service::noteNewWaiter and noteGoneWaiter +/// to maintain Service idea of waiting and being-notified transactions. +class ConnWaiterDialer: public NullaryMemFunT +{ +public: + typedef NullaryMemFunT Parent; + ServiceRep::Pointer theService; + ConnWaiterDialer(const CbcPointer &xact, Parent::Method aHandler); + ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter); + ~ConnWaiterDialer(); +}; } // namespace Icap } // namespace Adaptation === modified file 'src/adaptation/icap/Xaction.cc' --- src/adaptation/icap/Xaction.cc 2011-04-07 12:42:02 +0000 +++ src/adaptation/icap/Xaction.cc 2011-05-09 08:41:11 +0000 @@ -21,8 +21,6 @@ #include "SquidTime.h" #include "err_detail_type.h" -static PconnPool *icapPconnPool = new PconnPool("ICAP Servers"); - //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction); @@ -90,19 +88,24 @@ // TODO: obey service-specific, OPTIONS-reported connection limit void Adaptation::Icap::Xaction::openConnection() { - Ip::Address client_addr; - Must(connection < 0); - const Adaptation::Service &s = service(); + Adaptation::Icap::ServiceRep &s = service(); if (!TheConfig.reuse_connections) disableRetries(); // this will also safely drain pconn pool - // TODO: check whether NULL domain is appropriate here - connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable); - if (connection >= 0) { - debugs(93,3, HERE << "reused pconn FD " << connection); + bool wasReused = false; + connection = s.getConnection(isRetriable, wasReused); + if (connection < 0) + dieOnConnectionFailure(); // throws + + if (wasReused) { + // Set comm Close handler + typedef CommCbMemFunT CloseDialer; + closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", + CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); + comm_add_close_handler(connection, closer); // fake the connect callback // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead? @@ -119,39 +122,25 @@ disableRetries(); // we only retry pconn failures - Ip::Address outgoing; - if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) { - debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address."); - dieOnConnectionFailure(); // throws - } - /* split-stack for now requires default IPv4-only socket */ - if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) { - outgoing.SetIPv4(); - } - - connection = comm_open(SOCK_STREAM, 0, outgoing, - COMM_NONBLOCKING, s.cfg().uri.termedBuf()); - - if (connection < 0) - dieOnConnectionFailure(); // throws - - debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port); + + debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port); // TODO: service bypass status may differ from that of a transaction typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(93, 5, - TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout); + AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", + TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); + commSetTimeout(connection, TheConfig.connect_timeout( service().cfg().bypass), timeoutCall); typedef CommCbMemFunT CloseDialer; - closer = JobCallback(93, 5, - CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed); + closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", + CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); comm_add_close_handler(connection, closer); typedef CommCbMemFunT ConnectDialer; - connector = JobCallback(93,3, - ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); + connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", + ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected)); commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector); } @@ -186,21 +175,11 @@ reuseConnection = false; } - if (reuseConnection) { - Ip::Address client_addr; - //status() adds leading spaces. - debugs(93,3, HERE << "pushing pconn" << status()); - AsyncCall::Pointer call = NULL; - commSetTimeout(connection, -1, call); - icapPconnPool->push(connection, theService->cfg().host.termedBuf(), - theService->cfg().port, NULL, client_addr); + if (reuseConnection) disableRetries(); - } else { - //status() adds leading spaces. - debugs(93,3, HERE << "closing pconn" << status()); - // comm_close will clear timeout - comm_close(connection); - } + + Adaptation::Icap::ServiceRep &s = service(); + s.putConnection(connection, reuseConnection, status()); writer = NULL; reader = NULL; @@ -218,7 +197,7 @@ if (io.flag != COMM_OK) dieOnConnectionFailure(); // throws - fd_table[connection].noteUse(icapPconnPool); + service().noteConnectionUse(connection); handleCommConnected(); } === modified file 'src/adaptation/icap/Xaction.h' --- src/adaptation/icap/Xaction.h 2010-10-21 08:13:41 +0000 +++ src/adaptation/icap/Xaction.h 2011-04-26 14:31:14 +0000 @@ -138,6 +138,7 @@ void setOutcome(const XactOutcome &xo); virtual void finalizeLogInfo(); +public: ServiceRep &service(); private: === modified file 'src/cf.data.pre' --- src/cf.data.pre 2011-05-03 03:01:59 +0000 +++ src/cf.data.pre 2011-05-09 08:54:55 +0000 @@ -6548,6 +6548,26 @@ is to use IPv4-only connections. When set to 'on' this option will make Squid use IPv6-only connections to contact this ICAP service. + on-overload=block|bypass|wait|force + If the service Max-Connections limit has been reached, do + one of the following for each new ICAP transaction: + * block: send an HTTP error response to the client + * bypass: ignore the "over-connected" ICAP service + * wait: wait (in a FIFO queue) for an ICAP connection slot + * force: proceed, ignoring the Max-Connections limit + + In SMP mode with N workers, each worker assumes the service + connection limit is Max-Connections/N, even though not all + workers may use a given service. + + The default value is "bypass" if service is bypassable, + otherwise it is set to "wait". + + + max-conn=number + Use the given number as the Max-Connections limit, regardless + of the Max-Connections value given by the service, if any. + Older icap_service format without optional named parameters is deprecated but supported for backward compatibility. === modified file 'src/comm.cc' --- src/comm.cc 2011-01-28 07:58:53 +0000 +++ src/comm.cc 2011-03-23 15:55:49 +0000 @@ -1490,7 +1490,7 @@ commCallCloseHandlers(fd); if (F->pconn.uses) - F->pconn.pool->count(F->pconn.uses); + F->pconn.pool->noteUses(F->pconn.uses); comm_empty_os_read_buffers(fd); === modified file 'src/pconn.cc' --- src/pconn.cc 2010-12-14 14:01:14 +0000 +++ src/pconn.cc 2011-04-26 14:38:06 +0000 @@ -94,6 +94,9 @@ for (; index < nfds - 1; index++) fds[index] = fds[index + 1]; + if (parent) + parent->noteConnectionRemoved(); + if (--nfds == 0) { debugs(48, 3, "IdleConnList::removeFD: deleting " << hashKeyStr(&hash)); delete this; @@ -123,6 +126,9 @@ xfree(old); } + if (parent) + parent->noteConnectionAdded(); + fds[nfds++] = fd; comm_read(fd, fakeReadBuf, sizeof(fakeReadBuf), IdleConnList::read, this); commSetTimeout(fd, Config.Timeout.pconn, IdleConnList::timeout, this); @@ -230,7 +236,8 @@ /* ========== PconnPool PUBLIC FUNCTIONS ============================================ */ -PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr) +PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr), + theCount(0) { int i; table = hash_create((HASHCMP *) strcmp, 229, hash_string); @@ -291,6 +298,7 @@ * We close available persistent connection if the caller transaction is not * retriable to avoid having a growing number of open connections when many * transactions create persistent connections but are not retriable. + * PconnPool::closeN() relies on that behavior as well. */ int PconnPool::pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool isRetriable) @@ -321,13 +329,23 @@ } void -PconnPool::unlinkList(IdleConnList *list) const -{ +PconnPool::closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address) +{ + // TODO: optimize: we can probably do hash_lookup just once + for (int i = 0; i < n; ++i) + pop(host, port, domain, client_address, false); // may fail! +} + +void +PconnPool::unlinkList(IdleConnList *list) +{ + theCount -= list->count(); + assert(theCount >= 0); hash_remove_link(table, &list->hash); } void -PconnPool::count(int uses) +PconnPool::noteUses(int uses) { if (uses >= PCONN_HIST_SZ) uses = PCONN_HIST_SZ - 1; === modified file 'src/pconn.h' --- src/pconn.h 2010-10-07 06:34:34 +0000 +++ src/pconn.h 2011-04-26 14:38:06 +0000 @@ -32,7 +32,6 @@ public: IdleConnList(const char *key, PconnPool *parent); ~IdleConnList(); - int numIdle() { return nfds; } int findFDIndex(int fd); ///< search from the end of array void removeFD(int fd); @@ -40,6 +39,8 @@ int findUseableFD(); ///< find first from the end not pending read fd. void clearHandlers(int fd); + int count() const { return nfds; } + private: static IOCB read; static PF timeout; @@ -82,10 +83,14 @@ void moduleInit(); void push(int fd, const char *host, u_short port, const char *domain, Ip::Address &client_address); int pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool retriable); - void count(int uses); + void noteUses(int uses); void dumpHist(StoreEntry *e); void dumpHash(StoreEntry *e); - void unlinkList(IdleConnList *list) const; + void unlinkList(IdleConnList *list); + void closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address); + int count() const { return theCount; } + void noteConnectionAdded() { ++theCount; } + void noteConnectionRemoved() { assert(theCount > 0); --theCount; } private: @@ -94,7 +99,7 @@ int hist[PCONN_HIST_SZ]; hash_table *table; const char *descr; - + int theCount; ///< the number of pooled connections };