=== modified file 'src/CommCalls.cc' --- src/CommCalls.cc 2012-08-28 13:00:30 +0000 +++ src/CommCalls.cc 2013-05-10 14:34:10 +0000 @@ -7,12 +7,12 @@ /* CommCommonCbParams */ CommCommonCbParams::CommCommonCbParams(void *aData): - data(cbdataReference(aData)), conn(), flag(COMM_OK), xerrno(0), fd(-1) + data(cbdataReference(aData)), conn(), flag(COMM_OK), xerrno(0), fd(-1), xaction() { } CommCommonCbParams::CommCommonCbParams(const CommCommonCbParams &p): - data(cbdataReference(p.data)), conn(p.conn), flag(p.flag), xerrno(p.xerrno), fd(p.fd) + data(cbdataReference(p.data)), conn(p.conn), flag(p.flag), xerrno(p.xerrno), fd(p.fd), xaction(p.xaction) { } @@ -35,6 +35,9 @@ os << ", flag=" << flag; if (data) os << ", data=" << data; + + if (xaction != NULL) + os << ", xaction.id=" << xaction->id; } /* CommAcceptCbParams */ === modified file 'src/CommCalls.h' --- src/CommCalls.h 2013-01-02 23:40:49 +0000 +++ src/CommCalls.h 2013-05-05 04:31:05 +0000 @@ -5,6 +5,7 @@ #include "base/AsyncJobCalls.h" #include "comm_err_t.h" #include "comm/forward.h" +#include "MasterXaction.h" /* CommCalls implement AsyncCall interface for comm_* callbacks. * The classes cover two call dialer kinds: @@ -79,6 +80,9 @@ int xerrno; ///< The last errno to occur. non-zero if flag is COMM_ERR. int fd; ///< FD which the call was about. Set by the async call creator. + + /// Transaction which this call is part of. + MasterXaction::Pointer xaction; private: // should not be needed and not yet implemented CommCommonCbParams &operator =(const CommCommonCbParams ¶ms); === modified file 'src/Makefile.am' --- src/Makefile.am 2013-05-08 03:02:01 +0000 +++ src/Makefile.am 2013-05-08 03:14:01 +0000 @@ -422,6 +422,8 @@ LogTags.h \ lookup_t.h \ main.cc \ + MasterXaction.cc \ + MasterXaction.h \ Mem.h \ mem.cc \ mem_node.cc \ @@ -1249,6 +1251,8 @@ HttpRequestMethod.cc \ int.h \ int.cc \ + MasterXaction.cc \ + MasterXaction.h \ SquidList.h \ SquidList.cc \ mem_node.cc \ @@ -1485,6 +1489,8 @@ internal.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ multicast.h \ multicast.cc \ mem_node.cc \ @@ -1664,6 +1670,8 @@ int.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ MemBuf.cc \ MemObject.cc \ mem_node.cc \ @@ -1899,6 +1907,8 @@ internal.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ Mem.h \ mem.cc \ mem_node.cc \ @@ -2145,6 +2155,8 @@ internal.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ MemBuf.cc \ MemObject.cc \ Mem.h \ @@ -2388,6 +2400,8 @@ ipcache.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ MemBuf.cc \ MemObject.cc \ Mem.h \ @@ -2676,6 +2690,8 @@ internal.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ multicast.h \ multicast.cc \ mem_node.cc \ @@ -2848,6 +2864,8 @@ int.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ Mem.h \ mem.cc \ mem_node.cc \ @@ -3073,6 +3091,8 @@ RequestFlags.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ MemObject.cc \ StoreSwapLogData.cc \ StoreIOState.cc \ @@ -3259,6 +3279,8 @@ int.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ Mem.h \ mem.cc \ MemBuf.cc \ @@ -3642,6 +3664,8 @@ internal.cc \ SquidList.h \ SquidList.cc \ + MasterXaction.cc \ + MasterXaction.h \ multicast.h \ multicast.cc \ Mem.h \ === added file 'src/MasterXaction.cc' --- src/MasterXaction.cc 1970-01-01 00:00:00 +0000 +++ src/MasterXaction.cc 2013-05-05 14:14:17 +0000 @@ -0,0 +1,4 @@ +#include "squid.h" +#include "MasterXaction.h" + +InstanceIdDefinitions(MasterXaction, "MXID_"); === added file 'src/MasterXaction.h' --- src/MasterXaction.h 1970-01-01 00:00:00 +0000 +++ src/MasterXaction.h 2013-05-10 14:50:03 +0000 @@ -0,0 +1,49 @@ +#ifndef SQUID_SRC_MASTERXACTION_H +#define SQUID_SRC_MASTERXACTION_H + +#include "anyp/PortCfg.h" +#include "base/CbcPointer.h" +#include "base/InstanceId.h" +#include "base/Lock.h" +#include "comm/forward.h" + +/** Maximum number of pipelined requests to pre-parse. + * + * Squid can pre-parse requests on a persistent connection and queue them for + * service while waiting for a leading transaction to complete. + * + * It is generally better to leave most pipelined requests in TCP buffers, + * which provides some push-back on the client sending rate. + * + * NP: currently limited to 2 in case there is any hidden code relying on that + * old hard-coded value still hanging around. + * May be set using -D in CXXFLAGS to test other values. + */ +#if !defined(SQUID_PIPELINE_MAX) +#define SQUID_PIPELINE_MAX 2 +#endif + +/** Master transaction state object. + * + * This class holds pointers to all the state + * generated during the processing of a client + * transaction. + */ +class MasterXaction : public RefCountable +{ +public: + typedef RefCount Pointer; + + /// transaction ID. + InstanceId id; + + /// the listening port which originated this transaction + CbcPointer squidPort; + + /// the client TCP connection which originated this transaction + Comm::ConnectionPointer tcpClient; + + // TODO: add state from other Jobs in the transaction +}; + +#endif /* SQUID_SRC_MASTERXACTION_H */ === modified file 'src/client_side.cc' --- src/client_side.cc 2013-05-08 03:02:01 +0000 +++ src/client_side.cc 2013-05-10 14:56:12 +0000 @@ -245,8 +245,6 @@ char *skipLeadingSpace(char *aString); static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount); -static ConnStateData *connStateCreate(const Comm::ConnectionPointer &client, AnyP::PortCfg *port); - clientStreamNode * ClientSocketContext::getTail() const { @@ -2935,14 +2933,21 @@ } } -static int +// XXX: make this a private method of ConnStateData +static bool connOkToAddRequest(ConnStateData * conn) { - int result = conn->getConcurrentRequestCount() < (Config.onoff.pipeline_prefetch ? 2 : 1); + const int maxReq = (Config.onoff.pipeline_prefetch ? SQUID_PIPELINE_MAX : 1); + const bool result = conn->getConcurrentRequestCount() < maxReq; + + // XXX: make this more dynamic? + // ... false / no pipelines for HTTP/1.0 clients? + // ... false if NTLM/Negotiate handshake is being done by the last parsed request + // ie, pipeline prefetch non-auth requests, halt during handshake, and resume pipeline prefetch after authed. if (!result) { - debugs(33, 3, HERE << conn->clientConnection << " max concurrent requests reached"); - debugs(33, 5, HERE << conn->clientConnection << " defering new request until one is done"); + debugs(33, 3, conn->clientConnection << " max concurrent requests reached (" << maxReq << ")"); + debugs(33, 5, conn->clientConnection << " defering new request until one is done"); } return result; @@ -3364,23 +3369,43 @@ io.conn->close(); } -ConnStateData * -connStateCreate(const Comm::ConnectionPointer &client, AnyP::PortCfg *port) +ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) : + AsyncJob("ConnStateData"), +#if USE_SSL + sslBumpMode(Ssl::bumpEnd), + switchedToHttps_(false), + sslServerBump(NULL), +#endif + stoppedSending_(NULL), + stoppedReceiving_(NULL) { - ConnStateData *result = new ConnStateData; - - result->clientConnection = client; - result->log_addr = client->remote; - result->log_addr.ApplyMask(Config.Addrs.client_netmask); - result->in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &result->in.allocatedSize); - result->port = cbdataReference(port); - + pinning.host = NULL; + pinning.port = -1; + pinning.pinned = false; + pinning.auth = false; + pinning.zeroReply = false; + pinning.peer = NULL; + + // on a new connection there is one transaction (about to start) + xaction[0] = xact; + + // store the details required for creating more MasterXaction objects + // later when the xaction[] has been completed and removed + clientConnection = xact->tcpClient; + port = cbdataReference(xact->squidPort.get()); + log_addr = xact->tcpClient->remote; + log_addr.ApplyMask(Config.Addrs.client_netmask); + + // TODO: make this a MemBuf (with higher than 64KB limits?) + in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &in.allocatedSize); + + // XXX: do this in TcpAcceptor? this is a socket layer operation. if (port->disable_pmtu_discovery != DISABLE_PMTU_OFF && - (result->transparent() || port->disable_pmtu_discovery == DISABLE_PMTU_ALWAYS)) { + (transparent() || port->disable_pmtu_discovery == DISABLE_PMTU_ALWAYS)) { #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DONT) int i = IP_PMTUDISC_DONT; - if (setsockopt(client->fd, SOL_IP, IP_MTU_DISCOVER, &i, sizeof(i)) < 0) - debugs(33, 2, "WARNING: Path MTU discovery disabling failed on " << client << " : " << xstrerror()); + if (setsockopt(clientConnection->fd, SOL_IP, IP_MTU_DISCOVER, &i, sizeof(i)) < 0) + debugs(33, 2, "WARNING: Path MTU discovery disabling failed on " << clientConnection << " : " << xstrerror()); #else static bool reported = false; @@ -3392,37 +3417,39 @@ } typedef CommCbMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(33, 5, Dialer, result, ConnStateData::connStateClosed); - comm_add_close_handler(client->fd, call); + AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, ConnStateData::connStateClosed); + comm_add_close_handler(clientConnection->fd, call); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(client->remote, FQDN_LOOKUP_IF_MISS); + fqdncache_gethostbyaddr(clientConnection->remote, FQDN_LOOKUP_IF_MISS); #if USE_IDENT if (Ident::TheConfig.identLookup) { ACLFilledChecklist identChecklist(Ident::TheConfig.identLookup, NULL, NULL); - identChecklist.src_addr = client->remote; - identChecklist.my_addr = client->local; +// XXX: pass xact to the checklist constructor instead of the below + identChecklist.src_addr = xact->tcpClient->remote; + identChecklist.my_addr = xact->tcpClient->local; if (identChecklist.fastCheck() == ACCESS_ALLOWED) - Ident::Start(client, clientIdentDone, result); + Ident::Start(xact->tcpClient, clientIdentDone, this); } #endif - clientdbEstablished(client->remote, 1); + clientdbEstablished(clientConnection->remote, 1); - result->flags.readMore = true; - return result; + flags.readMore = true; } /** Handle a new connection on HTTP socket. */ void httpAccept(const CommAcceptCbParams ¶ms) { - AnyP::PortCfg *s = static_cast(params.data); + MasterXaction::Pointer xact = params.xaction; + AnyP::PortCfg *s = xact->squidPort.get(); // XXX: avoid get() if (params.flag != COMM_OK) { // Its possible the call was still queued when the client disconnected debugs(33, 2, "httpAccept: " << s->listenConn << ": accept failure: " << xstrerr(params.xerrno)); +// XXX: log the failed connection? return; } @@ -3436,7 +3463,7 @@ ++ incoming_sockets_accepted; // Socket is ready, setup the connection manager to start using it - ConnStateData *connState = connStateCreate(params.conn, s); + ConnStateData *connState = new ConnStateData(xact); typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(33, 5, @@ -3453,6 +3480,7 @@ ClientDelayPools& pools(Config.ClientDelay.pools); ACLFilledChecklist ch(NULL, NULL, NULL); +// XXX: pass MasterXaction xact to ACLFilledChecklist constructor instead of the below // TODO: we check early to limit error response bandwith but we // should recheck when we can honor delay_pool_uses_indirect @@ -3471,6 +3499,7 @@ /* request client information from db after we did all checks this will save hash lookup if client failed checks */ ClientInfo * cli = clientdbGetInfo(params.conn->remote); +// XXX: save this ClientInfo data to MasterXaction xact ? assert(cli); /* put client info in FDE */ @@ -3480,6 +3509,7 @@ const double burst = floor(0.5 + (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0); cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark); +// XXX: save this delay pool data to MasterXaction xact ? break; } else { debugs(83, 4, HERE << "Delay pool " << pool << " skipped because ACL " << answer); @@ -3676,7 +3706,7 @@ /** * A callback function to use with the ACLFilledChecklist callback. - * In the case of ACCES_ALLOWED answer initializes a bumped SSL connection, + * In the case of ACCESS_ALLOWED answer initializes a bumped SSL connection, * else reverts the connection to tunnel mode. */ static void @@ -3718,11 +3748,14 @@ static void httpsAccept(const CommAcceptCbParams ¶ms) { - AnyP::PortCfg *s = static_cast(params.data); + MasterXaction::Pointer xact = params.xaction; + AnyP::PortCfg *s = xact->squidPort.get(); // XXX: avoid get() + assert(s); if (params.flag != COMM_OK) { // Its possible the call was still queued when the client disconnected debugs(33, 2, "httpsAccept: " << s->listenConn << ": accept failure: " << xstrerr(params.xerrno)); +// XXX: log the failed connection? return; } @@ -3736,7 +3769,7 @@ ++incoming_sockets_accepted; // Socket is ready, setup the connection manager to start using it - ConnStateData *connState = connStateCreate(params.conn, s); + ConnStateData *connState = new ConnStateData(xact); if (s->flags.tunnelSslBumping) { debugs(33, 5, "httpsAccept: accept transparent connection: " << params.conn); @@ -3746,6 +3779,8 @@ return; } +// XXX: when ACLs are using MasterXaction xact members the below can die ... + // Create a fake HTTP request for ssl_bump ACL check, // using tproxy/intercept provided destination IP and port. HttpRequest *request = new HttpRequest(); === modified file 'src/client_side.h' --- src/client_side.h 2013-04-04 06:15:00 +0000 +++ src/client_side.h 2013-05-06 03:19:49 +0000 @@ -175,10 +175,11 @@ class ServerBump; } #endif + /** * Manages a connection to a client. * - * Multiple requests (up to 2) can be pipelined. This object is responsible for managing + * Multiple requests (up to SQUID_PIPELINE_MAX) can be pipelined. This object is responsible for managing * which one is currently being fulfilled and what happens to the queue if the current one * causes the client connection to be closed early. * @@ -195,6 +196,7 @@ public: ConnStateData(); + ConnStateData(const MasterXaction::Pointer &xact); ~ConnStateData(); void readSomeData(); @@ -215,7 +217,16 @@ // HttpControlMsgSink API virtual void sendControlMsg(HttpControlMsg msg); + /** + * Transaction state for the current client transaction(s). + * Squid accepts up to SQUID_PIPELINE_MAX pipelined requests + * at any one time on a connection. + */ + MasterXaction::Pointer xaction[SQUID_PIPELINE_MAX]; + // Client TCP connection details from comm layer. + // XXX: make private now, useful only to spawn new MasterXaction objects for pipeline requests + // we need to retain because the client conn may be idle between requests with no MasterXaction to clone Comm::ConnectionPointer clientConnection; struct In { @@ -261,13 +272,15 @@ */ ClientSocketContext::Pointer currentobject; - Ip::Address log_addr; + Ip::Address log_addr; // TODO: remove entirely and produce masked IP on demand. int nrequests; struct { bool readMore; ///< needs comm_read (for this request or new requests) bool swanSang; // XXX: temporary flag to check proper cleanup } flags; + + // XXX: make private now? hidden behind pinning API, and used to fill MasterXaction struct { Comm::ConnectionPointer serverConnection; /* pinned server side connection */ char *host; /* host name of pinned connection */ @@ -279,6 +292,9 @@ AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/ } pinning; + + // XXX: make private now, useful only to spawn new MasterXaction objects for pipeline requests + // we need to retain because the client conn may be idle between requests with no MasterXaction to clone AnyP::PortCfg *port; bool transparent() const; === modified file 'src/comm/TcpAcceptor.cc' --- src/comm/TcpAcceptor.cc 2013-05-08 03:02:01 +0000 +++ src/comm/TcpAcceptor.cc 2013-05-08 03:14:01 +0000 @@ -46,6 +46,7 @@ #include "fde.h" #include "globals.h" #include "ip/Intercept.h" +#include "MasterXaction.h" #include "profiler/Profiler.h" #include "SquidConfig.h" #include "SquidTime.h" @@ -286,8 +287,10 @@ if (theCallSub != NULL) { AsyncCall::Pointer call = theCallSub->callback(); CommAcceptCbParams ¶ms = GetCommParams(call); + params.xaction = new MasterXaction; + params.xaction->squidPort = static_cast(params.data); // do this here or in the caller? params.fd = conn->fd; - params.conn = newConnDetails; + params.conn = params.xaction->tcpClient = newConnDetails; params.flag = flag; params.xerrno = errcode; ScheduleCallHere(call);