Re: Max simultaneous connections limit on per-destination basis

From: Radu Rendec <radu.rendec@dont-contact.us>
Date: Fri, 04 Nov 2005 17:22:23 +0200

Ok, you're right. I don't know exactly what the list policies are and I
didn't want to spam with large attachments or pieces of code.

I included the 2 functions that I had modified at the end of this
message. Hope it's ok. My hook is destdbUpdateActiveRequests(). The
second argument is the offset that is added to the counter.

Thanks,

Radu

On Thu, 2005-11-03 at 23:19 +0100, Henrik Nordstrom wrote:
> On Thu, 3 Nov 2005, Radu Rendec wrote:
>
> > Is there any chance I could miss a new request or the end of a request?
>
> Nothing obvious, but it's always easier to comment on the code than
> descriptions of the same..
>
> Regards
> Henrik

static void
httpRequestFree(void *data)
{
    clientHttpRequest *http = data;
    clientHttpRequest **H;
    ConnStateData *conn = http->conn;
    StoreEntry *e;
    request_t *request = http->request;
    MemObject *mem = NULL;
    debug(33, 3) ("httpRequestFree: %s\n", storeUrl(http->entry));
    if (!clientCheckTransferDone(http)) {
#if MYSTERIOUS_CODE
    /*
     * DW: this seems odd here, is it really needed? It causes
     * incomplete transfers to get logged with "000" status
     * code because http->entry becomes NULL.
     */
    if ((e = http->entry)) {
        http->entry = NULL;
        storeUnregister(http->sc, e, http);
        storeUnlockObject(e);
    }
#endif
    if (http->entry && http->entry->ping_status == PING_WAITING)
        storeReleaseRequest(http->entry);
    }
    assert(http->log_type < LOG_TYPE_MAX);
    if (http->entry)
    mem = http->entry->mem_obj;
    if (http->out.size || http->log_type) {
    http->al.icp.opcode = ICP_INVALID;
    http->al.url = http->log_uri;
    debug(33, 9) ("httpRequestFree: al.url='%s'\n", http->al.url);
    if (mem) {
        http->al.http.code = mem->reply->sline.status;
        http->al.http.content_type = strBuf(mem->reply->content_type);
    }
    http->al.cache.caddr = conn->log_addr;
    http->al.cache.size = http->out.size;
    http->al.cache.code = http->log_type;
    http->al.cache.msec = tvSubMsec(http->start, current_time);
    if (request) {
        Packer p;
        MemBuf mb;
        memBufDefInit(&mb);
        packerToMemInit(&p, &mb);
        httpHeaderPackInto(&request->header, &p);
        http->al.http.method = request->method;
        http->al.http.version = request->http_ver;
        http->al.headers.request = xstrdup(mb.buf);
        http->al.hier = request->hier;
        if (request->user_ident[0])
        http->al.cache.ident = request->user_ident;
        else
        http->al.cache.ident = conn->ident;
        packerClean(&p);
        memBufClean(&mb);
    }
    accessLogLog(&http->al);
    clientUpdateCounters(http);
    clientdbUpdate(conn->peer.sin_addr, http->log_type, PROTO_HTTP, http->out.size);
    destdbUpdateActiveRequests(http->request->original_host, -1);
    }
    if (http->acl_checklist)
    aclChecklistFree(http->acl_checklist);
    if (request)
    checkFailureRatio(request->err_type, http->al.hier.code);
    safe_free(http->uri);
    safe_free(http->log_uri);
    safe_free(http->al.headers.request);
    safe_free(http->al.headers.reply);
    safe_free(http->redirect.location);
    stringClean(&http->range_iter.boundary);
    if ((e = http->entry)) {
    http->entry = NULL;
    storeUnregister(http->sc, e, http);
    http->sc = NULL;
    storeUnlockObject(e);
    }
    /* old_entry might still be set if we didn't yet get the reply
     * code in clientHandleIMSReply() */
    if ((e = http->old_entry)) {
    http->old_entry = NULL;
    storeUnregister(http->old_sc, e, http);
    http->old_sc = NULL;
    storeUnlockObject(e);
    }
    requestUnlink(http->request);
    assert(http != http->next);
    assert(http->conn->chr != NULL);
    H = &http->conn->chr;
    while (*H) {
    if (*H == http)
        break;
    H = &(*H)->next;
    }
    assert(*H != NULL);
    *H = http->next;
    http->next = NULL;
    dlinkDelete(&http->active, &ClientActiveRequests);
    cbdataFree(http);
}

static void
clientReadRequest(int fd, void *data)
{
    ConnStateData *conn = data;
    int parser_return_code = 0;
    int k;
    request_t *request = NULL;
    int size;
    void *p;
    method_t method;
    clientHttpRequest *http = NULL;
    clientHttpRequest **H = NULL;
    char *prefix = NULL;
    ErrorState *err = NULL;
    fde *F = &fd_table[fd];
    int len = conn->in.size - conn->in.offset - 1;
    debug(33, 4) ("clientReadRequest: FD %d: reading request...\n", fd);
    statCounter.syscalls.sock.reads++;
    size = read(fd, conn->in.buf + conn->in.offset, len);
    if (size > 0) {
        fd_bytes(fd, size, FD_READ);
        kb_incr(&statCounter.client_http.kbytes_in, size);
    }
    /*
     * Don't reset the timeout value here. The timeout value will be
     * set to Config.Timeout.request by httpAccept() and
     * clientWriteComplete(), and should apply to the request as a
     * whole, not individual read() calls. Plus, it breaks our
     * lame half-close detection
     */
    commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0);
    if (size == 0) {
        if (conn->chr == NULL) {
            /* no current or pending requests */
            comm_close(fd);
            return;
        } else if (!Config.onoff.half_closed_clients) {
            /* admin doesn't want to support half-closed client sockets */
            comm_close(fd);
            return;
        }
        /* It might be half-closed, we can't tell */
        debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd);
        F->flags.socket_eof = 1;
        conn->defer.until = squid_curtime + 1;
        conn->defer.n++;
        fd_note(fd, "half-closed");
        return;
    } else if (size < 0) {
        if (!ignoreErrno(errno)) {
            debug(50, 2) ("clientReadRequest: FD %d: %s\n", fd, xstrerror());
            comm_close(fd);
            return;
        } else if (conn->in.offset == 0) {
            debug(50, 2) ("clientReadRequest: FD %d: no data to process (%s)\n", fd, xstrerror());
            return;
        }
        /* Continue to process previously read data */
        size = 0;
    }
    conn->in.offset += size;
    /* Skip leading (and trailing) whitespace */
    while (conn->in.offset > 0) {
        int nrequests;
        size_t req_line_sz;
        while (conn->in.offset > 0 && xisspace(conn->in.buf[0])) {
            xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.offset - 1);
            conn->in.offset--;
        }
        conn->in.buf[conn->in.offset] = '\0'; /* Terminate the string */
        if (conn->in.offset == 0)
            break;
        /* Limit the number of concurrent requests to 2 */
        for (H = &conn->chr, nrequests = 0; *H; H = &(*H)->next, nrequests++);
        if (nrequests >= (Config.onoff.pipeline_prefetch ? 2 : 1)) {
            debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n", fd);
            debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n", fd);
            conn->defer.until = squid_curtime + 100; /* Reset when a request is complete */
            break;
        }
        /* Process request */
        http = parseHttpRequest(conn,
            &method,
            &parser_return_code,
            &prefix,
            &req_line_sz);
        if (!http)
            safe_free(prefix);
        if (http) {
            assert(http->req_sz > 0);
            conn->in.offset -= http->req_sz;
            assert(conn->in.offset >= 0);
            debug(33, 5) ("conn->in.offset = %d\n", (int) conn->in.offset);
            /*
             * If we read past the end of this request, move the remaining
             * data to the beginning
             */
            if (conn->in.offset > 0)
                xmemmove(conn->in.buf, conn->in.buf + http->req_sz, conn->in.offset);
            /* add to the client request queue */
            for (H = &conn->chr; *H; H = &(*H)->next);
            *H = http;
            conn->nrequests++;
            /*
             * I wanted to lock 'http' here since its callback data for
             * clientLifetimeTimeout(), but there's no logical place to
             * cbdataUnlock if the timeout never happens. Maybe its safe
             * enough to assume that if the FD is open, and the timeout
             * triggers, that 'http' is valid.
             */
            commSetTimeout(fd, Config.Timeout.lifetime, clientLifetimeTimeout, http);
            if (parser_return_code < 0) {
                debug(33, 1) ("clientReadRequest: FD %d Invalid Request\n", fd);
                err = errorCon(ERR_INVALID_REQ, HTTP_BAD_REQUEST);
                err->request_hdrs = xstrdup(conn->in.buf);
                http->entry = clientCreateStoreEntry(http, method, null_request_flags);
                errorAppendEntry(http->entry, err);
                safe_free(prefix);
                break;
            }
            if ((request = urlParse(method, http->uri)) == NULL) {
                debug(33, 5) ("Invalid URL: %s\n", http->uri);
                err = errorCon(ERR_INVALID_URL, HTTP_BAD_REQUEST);
                err->src_addr = conn->peer.sin_addr;
                err->url = xstrdup(http->uri);
                http->al.http.code = err->http_status;
                http->entry = clientCreateStoreEntry(http, method, null_request_flags);
                errorAppendEntry(http->entry, err);
                safe_free(prefix);
                break;
            } else {
                /* compile headers */
                /* we should skip request line! */
                if (!httpRequestParseHeader(request, prefix + req_line_sz))
                    debug(33, 1) ("Failed to parse request headers: %s\n%s\n",
                        http->uri, prefix);
                /* continue anyway? */
            }
            request->flags.accelerated = http->flags.accel;
            if (!http->flags.internal) {
                if (internalCheck(strBuf(request->urlpath))) {
                    if (internalHostnameIs(request->host) &&
                        request->port == ntohs(Config.Sockaddr.http->s.sin_port)) {
                        http->flags.internal = 1;
                    } else if (internalStaticCheck(strBuf(request->urlpath))) {
                        xstrncpy(request->host, internalHostname(), SQUIDHOSTNAMELEN);
                        request->port = ntohs(Config.Sockaddr.http->s.sin_port);
                        http->flags.internal = 1;
                    }
                }
            }
            /*
             * cache the Content-length value in request_t.
             */
            request->content_length = httpHeaderGetInt(&request->header,
                HDR_CONTENT_LENGTH);
            request->flags.internal = http->flags.internal;
            safe_free(prefix);
            safe_free(http->log_uri);
            http->log_uri = xstrdup(urlCanonicalClean(request));
            request->client_addr = conn->peer.sin_addr;
            request->my_addr = conn->me.sin_addr;
            request->my_port = ntohs(conn->me.sin_port);
            request->http_ver = http->http_ver;
            if (!urlCheckRequest(request)) {
                err = errorCon(ERR_UNSUP_REQ, HTTP_NOT_IMPLEMENTED);
                err->src_addr = conn->peer.sin_addr;
                err->request = requestLink(request);
                request->flags.proxy_keepalive = 0;
                http->al.http.code = err->http_status;
                http->entry = clientCreateStoreEntry(http, request->method, null_request_flags);
                errorAppendEntry(http->entry, err);
                break;
            }
            if (0 == clientCheckContentLength(request)) {
                err = errorCon(ERR_INVALID_REQ, HTTP_LENGTH_REQUIRED);
                err->src_addr = conn->peer.sin_addr;
                err->request = requestLink(request);
                http->al.http.code = err->http_status;
                http->entry = clientCreateStoreEntry(http, request->method, null_request_flags);
                errorAppendEntry(http->entry, err);
                break;
            }
            http->request = requestLink(request);
            /*
             * We need to set the keepalive flag before doing some
             * hacks for POST/PUT requests below. Maybe we could
             * set keepalive flag even earlier.
             */
            clientSetKeepaliveFlag(http);
            /*
             * break here if the request has a content-length
             * because there is a reqeust body following and we
             * don't want to parse it as though it was new request.
             */
            if (request->content_length >= 0) {
                int copy_len = XMIN(conn->in.offset, request->content_length);
                if (copy_len > 0) {
                    assert(conn->in.offset >= copy_len);
                    request->body_sz = copy_len;
                    request->body = xmalloc(request->body_sz);
                    xmemcpy(request->body, conn->in.buf, request->body_sz);
                    conn->in.offset -= copy_len;
                    if (conn->in.offset)
                        xmemmove(conn->in.buf, conn->in.buf + copy_len, conn->in.offset);
                }
                /*
                 * if we didn't get the full body now, then more will
                 * be arriving on the client socket. Lets cancel
                 * the read handler until this request gets forwarded.
                 */
                if (request->body_sz < request->content_length)
                    commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
                if (request->content_length < 0)
                    (void) 0;
                else if (clientRequestBodyTooLarge(request->content_length)) {
                    err = errorCon(ERR_TOO_BIG, HTTP_REQUEST_ENTITY_TOO_LARGE);
                    err->request = requestLink(request);
                    http->entry = clientCreateStoreEntry(http,
                        METHOD_NONE, null_request_flags);
                    errorAppendEntry(http->entry, err);
                    break;
                }
            }
                destdbUpdateActiveRequests(http->request->host, 1);
            clientAccessCheck(http);
            continue; /* while offset > 0 */
        } else if (parser_return_code == 0) {
            /*
             * Partial request received; reschedule until parseHttpRequest()
             * is happy with the input
             */
            k = conn->in.size - 1 - conn->in.offset;
            if (k == 0) {
                if (conn->in.offset >= Config.maxRequestHeaderSize) {
                    /* The request is too large to handle */
                    debug(33, 1) ("Request header is too large (%d bytes)\n",
                        (int) conn->in.offset);
                    debug(33, 1) ("Config 'request_header_max_size'= %d bytes.\n",
                        Config.maxRequestHeaderSize);
                    err = errorCon(ERR_TOO_BIG, HTTP_REQUEST_ENTITY_TOO_LARGE);
                    http = parseHttpRequestAbort(conn, "error:request-too-large");
                    /* add to the client request queue */
                    for (H = &conn->chr; *H; H = &(*H)->next);
                    *H = http;
                    http->entry = clientCreateStoreEntry(http, METHOD_NONE, null_request_flags);
                    errorAppendEntry(http->entry, err);
                    return;
                }
                /* Grow the request memory area to accomodate for a large request */
                conn->in.size += CLIENT_REQ_BUF_SZ;
                if (conn->in.size == 2 * CLIENT_REQ_BUF_SZ) {
                    p = conn->in.buf; /* get rid of fixed size Pooled buffer */
                    conn->in.buf = xcalloc(2, CLIENT_REQ_BUF_SZ);
                    xmemcpy(conn->in.buf, p, CLIENT_REQ_BUF_SZ);
                    memFree(p, MEM_CLIENT_REQ_BUF);
                } else
                    conn->in.buf = xrealloc(conn->in.buf, conn->in.size);
                /* XXX account conn->in.buf */
                debug(33, 3) ("Handling a large request, offset=%d inbufsize=%d\n",
                    (int) conn->in.offset, conn->in.size);
                k = conn->in.size - 1 - conn->in.offset;
            }
            break;
        }
    }
}
Received on Fri Nov 04 2005 - 09:44:14 MST

This archive was generated by hypermail pre-2.1.9 : Thu Dec 01 2005 - 12:00:15 MST