1.2b23: My request body processing code

From: Henrik Nordstrom <hno@dont-contact.us>
Date: Sat, 25 Jul 1998 14:12:40 +0200

Here is a updated 1.2b23 version of my request body processing code.

* Request-bodies are available to the protocols by using
  clientReadBody(request, buffer, len, callback, data), which is used
  in a way similar to file_read(). clientReadBody uses a new
  request-structure variable named body_connection, and this can be
  used as a boolean to check for presence of a request-body. Should
  probably add a flag as well to make it clearer.

  The callback is called like
    callback(buffer, read_len, data)
      read_len == 0 on end of request body
      read_len == -1 on errors/abort.

  There is also a new request structure flag: REQ_SENT_BODY, which
  indicates that we have used some/all of the request-body. This is
  used to disable restarts of such requests.

* All request reading is done in clientReadRequest, which properly
  understands about aborted, half-closed, persistent and pipeloned
  client connections.

* No more magic in clientReadRequest to determine if there is a
  request-body or not.

* It should be trivial to enchance this with chunked transfer encoding
  apart from the actual code to decode it, which is a must if we are
  to support HTTP 1.1.

* Replaces the pump module in both HTTP and FTP. (pump.c now unused)

ToDo:

* Hierarchy stoplists to stop POST/PUT requests from traversing a
hierarchy. (simple squid.conf configuration)

* Add cbdata checks. This involves client state, buffer, client request
structure and protocol data (void * protocol state). Not sure which
structures that are currently registered with cbdata, or how to add the
ones that aren't without breaking things.

/Henrik

    [ Part 2: "Attached Text" ]

Index: squid/ChangeLog
diff -u squid/ChangeLog:1.1.1.23 squid/ChangeLog:1.1.1.23.2.1
--- squid/ChangeLog:1.1.1.23 Thu Jul 23 22:37:06 1998
+++ squid/ChangeLog Sat Jul 25 13:23:00 1998
@@ -1,3 +1,11 @@
+ - Reworked how request bodies are passed down to the protocols.
+ Now all client side processing is inside client_side.c, and
+ the pass and pump modules is no longer used.
+ - Abort properly if the client aborts in the middle of sending a
+ request.
+ - Handle pipelined requests together with PUT/POST.
+ - FTP PUT datachannel opened correcly (PASV/PORT/default)
+
 Changes to squid-1.2.beta23 (June 22, 1998):
 
         - Added Turkish error pages by Tural KAPTAN.
Index: squid/src/HttpRequest.c
diff -u squid/src/HttpRequest.c:1.1.1.3 squid/src/HttpRequest.c:1.1.1.3.2.1
--- squid/src/HttpRequest.c:1.1.1.3 Thu Jul 23 22:37:43 1998
+++ squid/src/HttpRequest.c Sat Jul 25 13:24:02 1998
@@ -53,7 +53,8 @@
 requestDestroy(request_t * req)
 {
     assert(req);
- safe_free(req->body);
+ if (req->body_connection)
+ clientAbortBody(req);
     safe_free(req->canonical);
     stringClean(&req->urlpath);
     httpHeaderClean(&req->header);
Index: squid/src/client_side.c
diff -u squid/src/client_side.c:1.1.1.23.2.1 squid/src/client_side.c:1.1.1.23.2.2
--- squid/src/client_side.c:1.1.1.23.2.1 Fri Jul 24 00:12:56 1998
+++ squid/src/client_side.c Sat Jul 25 13:24:03 1998
@@ -74,9 +74,10 @@
 static HttpReply *clientConstructProxyAuthReply(clientHttpRequest * http);
 static int clientCachable(clientHttpRequest * http);
 static int clientHierarchical(clientHttpRequest * http);
-static int clientCheckContentLength(request_t * r);
 static int httpAcceptDefer(void);
 static log_type clientProcessRequest2(clientHttpRequest * http);
+static int clientCheckContentLength(request_t * r, int content_length);
+static void clientProcessBody(ConnStateData *conn);
 
 static int
 checkAccelOnly(clientHttpRequest * http)
@@ -218,10 +219,9 @@
         httpHeaderAppend(&new_request->header, &old_request->header);
         new_request->client_addr = old_request->client_addr;
         EBIT_SET(new_request->flags, REQ_REDIRECTED);
- if (old_request->body) {
- new_request->body = xmalloc(old_request->body_sz);
- xmemcpy(new_request->body, old_request->body, old_request->body_sz);
- new_request->body_sz = old_request->body_sz;
+ if (old_request->body_connection) {
+ new_request->body_connection = old_request->body_connection;
+ old_request->body_connection = NULL;
         }
         requestUnlink(old_request);
         http->request = requestLink(new_request);
@@ -555,6 +555,8 @@
     MemObject *mem = NULL;
     debug(33, 3) ("httpRequestFree: %s\n", storeUrl(entry));
     if (!clientCheckTransferDone(http)) {
+ if (request && request->body_connection)
+ clientAbortBody(request); /* abort body transter */
         if (entry)
             storeUnregister(entry, http); /* unregister BEFORE abort */
         CheckQuickAbort(http);
@@ -623,6 +625,7 @@
     requestUnlink(http->request);
     assert(http != http->next);
     assert(http->conn->chr != NULL);
+ /* Unlink us from the clients request list */
     H = &http->conn->chr;
     while (*H) {
         if (*H == http)
@@ -786,14 +789,19 @@
 }
 
 static int
-clientCheckContentLength(request_t * r)
+clientCheckContentLength(request_t * r, int content_length)
 {
- /* We only require a content-length for "upload" methods */
- if (!pumpMethod(r->method))
+ /* We only require a content-length for POST/PUT methods */
+ switch(r->method) {
+ case METHOD_PUT:
+ case METHOD_POST:
+ break;
+ default:
+ /* No content-length required */
         return 1;
- if (httpHeaderGetInt(&r->header, HDR_CONTENT_LENGTH) < 0)
- return 0;
- return 1;
+ }
+ /* Valid content length required */
+ return content_length >= 0;
 }
 
 static int
@@ -824,14 +832,18 @@
     if (req->protocol == PROTO_HTTP)
         return httpCachable(method);
     /* FTP is always cachable */
- if (req->protocol == PROTO_GOPHER)
- return gopherCachable(url);
     if (req->protocol == PROTO_WAIS)
         return 0;
     if (method == METHOD_CONNECT)
         return 0;
     if (method == METHOD_TRACE)
         return 0;
+ if (method == METHOD_PUT)
+ return 0;
+ if (method == METHOD_POST)
+ return 0; /* XXX POST may be cached sometimes.. ignored for now */
+ if (req->protocol == PROTO_GOPHER)
+ return gopherCachable(url);
     if (req->protocol == PROTO_CACHEOBJ)
         return 0;
     return 1;
@@ -1363,7 +1375,7 @@
         fd, storeUrl(entry), (int) http->out.offset);
     if (conn->chr != http) {
         /* there is another object in progress, defer this one */
- debug(0, 0) ("clientSendMoreData: Deferring %s\n", storeUrl(entry));
+ debug(33, 2) ("clientSendMoreData: Deferring %s\n", storeUrl(entry));
         memFree4K(buf);
         return;
     } else if (entry && entry->store_status == STORE_ABORTED) {
@@ -1471,7 +1483,7 @@
     conn->defer.until = 0; /* Kick it to read a new request */
     httpRequestFree(http);
     if ((http = conn->chr) != NULL) {
- debug(33, 1) ("clientKeepaliveNextRequest: FD %d Sending next\n",
+ debug(33, 3) ("clientKeepaliveNextRequest: FD %d Sending next\n",
             conn->fd);
         entry = http->entry;
         if (0 == storeClientCopyPending(entry, http)) {
@@ -1488,7 +1500,7 @@
     } else {
         debug(33, 5) ("clientKeepaliveNextRequest: FD %d reading next request\n",
             conn->fd);
- fd_note(conn->fd, "Reading next request");
+ fd_note(conn->fd, "Waiting for next request");
         /*
          * Set the timeout BEFORE calling clientReadRequest().
          */
@@ -1664,10 +1676,6 @@
         }
         /* yes, continue */
         http->log_type = LOG_TCP_MISS;
- } else if (pumpMethod(r->method)) {
- http->log_type = LOG_TCP_MISS;
- /* XXX oof, POST can be cached! */
- pumpInit(fd, r, http->uri);
     } else {
         http->log_type = clientProcessRequest2(http);
     }
@@ -1943,7 +1951,10 @@
 clientReadDefer(int fdnotused, void *data)
 {
     ConnStateData *conn = data;
- return conn->defer.until > squid_curtime;
+ if (conn->body.size_left)
+ return conn->in.offset >= conn->in.size;
+ else
+ return conn->defer.until > squid_curtime;
 }
 
 static void
@@ -1961,6 +1972,7 @@
     ErrorState *err = NULL;
     fde *F = &fd_table[fd];
     int len = conn->in.size - conn->in.offset - 1;
+ int content_length;
     debug(33, 4) ("clientReadRequest: FD %d: reading request...\n", fd);
     size = read(fd, conn->in.buf + conn->in.offset, len);
     if (size > 0) {
@@ -1974,14 +1986,19 @@
      * whole, not individual read() calls. Plus, it breaks our
      * lame half-close detection
      */
- commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0);
- if (size == 0) {
- if (conn->chr == NULL) {
+ if (size > 0) {
+ conn->in.offset += size;
+ conn->in.buf[conn->in.offset] = '\0'; /* Terminate the string */
+ } else if (size == 0 && len > 0) {
+ if (conn->chr == NULL && conn->in.offset == 0) {
             /* no current or pending requests */
+ debug(33, 4) ("clientReadRequest: FD %d closed\n", fd);
             comm_close(fd);
             return;
- } else if (!Config.onoff.half_closed_clients) {
+ }
+ if (!Config.onoff.half_closed_clients) {
             /* admin doesn't want to support half-closed client sockets */
+ debug(33, 3) ("clientReadRequest: FD %d aborted (half_closed_clients disabled)\n", fd);
             comm_close(fd);
             return;
         }
@@ -1991,7 +2008,11 @@
         conn->defer.until = squid_curtime + 1;
         conn->defer.n++;
         fd_note(fd, "half-closed");
- return;
+ /* There is one more close check at the end, to detect aborted
+ * (partial) requests. At this point we can't tell if the request
+ * is partial.
+ */
+ /* Continue to process previously read data */
     } else if (size < 0) {
         if (!ignoreErrno(errno)) {
             debug(50, 2) ("clientReadRequest: FD %d: %s\n", fd, xstrerror());
@@ -2002,18 +2023,21 @@
             return;
         }
         /* Continue to process previously read data */
- size = 0;
- }
- conn->in.offset += size;
- /* Skip leading (and trailing) whitespace */
- while (conn->in.offset > 0 && isspace(conn->in.buf[0])) {
- xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.offset - 1);
- conn->in.offset--;
     }
- conn->in.buf[conn->in.offset] = '\0'; /* Terminate the string */
- while (conn->in.offset > 0) {
+ commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0);
+ /* Process request body */
+ if (conn->in.offset > 0 && conn->body.callback != NULL)
+ clientProcessBody(conn);
+ /* Process new requests */
+ while (conn->in.offset > 0 && conn->body.size_left == 0) {
         int nrequests;
         size_t req_line_sz;
+ /* Skip leading (and trailing) whitespace */
+ if (isspace(conn->in.buf[0])) {
+ xmemmove(conn->in.buf, conn->in.buf+1, conn->in.offset-1);
+ conn->in.offset--;
+ continue;
+ }
         /* Limit the number of concurrent requests to 2 */
         for (H = &conn->chr, nrequests = 0; *H; H = &(*H)->next, nrequests++);
         if (nrequests >= 2) {
@@ -2022,6 +2046,9 @@
             conn->defer.until = squid_curtime + 100; /* Reset when a request is complete */
             break;
         }
+ conn->in.buf[conn->in.offset] = '\0'; /* Terminate the string */
+ if (nrequests == 0)
+ fd_note(conn->fd,"Reading next request");
         /* Process request */
         http = parseHttpRequest(conn,
             &method,
@@ -2090,6 +2117,7 @@
             http->log_uri = xstrdup(urlCanonicalClean(request));
             request->client_addr = conn->peer.sin_addr;
             request->http_ver = http->http_ver;
+ content_length=httpHeaderGetInt(&request->header, HDR_CONTENT_LENGTH);
             if (!urlCheckRequest(request)) {
                 err = errorCon(ERR_UNSUP_REQ, HTTP_NOT_IMPLEMENTED);
                 err->src_addr = conn->peer.sin_addr;
@@ -2099,7 +2127,7 @@
                 errorAppendEntry(http->entry, err);
                 break;
             }
- if (0 == clientCheckContentLength(request)) {
+ if (!clientCheckContentLength(request, content_length)) {
                 err = errorCon(ERR_INVALID_REQ, HTTP_LENGTH_REQUIRED);
                 err->src_addr = conn->peer.sin_addr;
                 err->request = requestLink(request);
@@ -2109,32 +2137,13 @@
                 break;
             }
             http->request = requestLink(request);
- clientAccessCheck(http);
- /*
- * break here for NON-GET because most likely there is a
- * reqeust body following and we don't want to parse it
- * as though it was new request
- */
- if (request->method != METHOD_GET) {
- int cont_len = httpHeaderGetInt(&request->header, HDR_CONTENT_LENGTH);
- int copy_len = XMIN(cont_len, conn->in.offset);
- if (copy_len > 0) {
- assert(conn->in.offset >= copy_len);
- request->body_sz = copy_len;
- request->body = xmalloc(request->body_sz);
- xmemcpy(request->body, conn->in.buf, request->body_sz);
- conn->in.offset -= copy_len;
- if (conn->in.offset)
- xmemmove(conn->in.buf, conn->in.buf + copy_len, conn->in.offset);
- }
- /*
- * ick; cancel the read handler for NON-GET requests
- * until this request is forwarded/resolved
- */
- commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
- break;
+ /* Do we expect a request-body? */
+ if (content_length > 0) {
+ conn->body.size_left = content_length;
+ request->body_connection = conn;
             }
- continue; /* while offset > 0 */
+ clientAccessCheck(http);
+ continue; /* while offset > 0 && body.size_left == 0 */
         } else if (parser_return_code == 0) {
             /*
              * Partial request received; reschedule until parseHttpRequest()
@@ -2164,7 +2173,130 @@
             }
             break;
         }
+ } /* while offset > 0 && conn->body.size_left == 0 */
+ /* Check if a half-closed connection was aborted in the middle */
+ if (F->flags.socket_eof) {
+ if (conn->in.offset != conn->body.size_left) { /* != 0 when no request body */
+ /* Partial request received. Abort client connection! */
+ debug(33, 3) ("clientReadRequest: FD %d aborted\n", fd);
+ comm_close(fd);
+ return;
+ }
     }
+}
+
+/* file_read like function, for reading body content */
+void
+clientReadBody( request_t *request, char *buf, size_t size, CBCB *callback, void *cbdata)
+{
+ ConnStateData *conn = request->body_connection;
+ if (!conn) {
+ debug(33, 1) ("clientReadBody: no body to read, request=%p\n",request);
+ callback(buf,0,cbdata); /* Signal end of body */
+ return;
+ }
+ debug(33, 2) ("clientReadBody: start fd=%d body_size=%d in.offset=%d cb=%p req=%p\n", conn->fd, conn->body.size_left, conn->in.offset, callback, request);
+ conn->body.callback = callback;
+ conn->body.cbdata = cbdata;
+ conn->body.buf = buf;
+ conn->body.bufsize = size;
+ conn->body.request = requestLink(request);
+ if (conn->in.offset) {
+ /* Data available */
+ clientProcessBody(conn);
+ } else {
+ debug(33, 2) ("clientReadBody: fd %d wait for clientReadRequest\n", conn->fd);
+ }
+}
+
+/* Called by clientReadRequest to process body content */
+static void
+clientProcessBody(ConnStateData *conn)
+{
+ int size;
+ char *buf = conn->body.buf;
+ void *cbdata = conn->body.cbdata;
+ CBCB *callback = conn->body.callback;
+ request_t *request = conn->body.request;
+ debug(33, 2) ("clientProcessBody: start fd=%d body_size=%d in.offset=%d cb=%p req=%p\n", conn->fd, conn->body.size_left, conn->in.offset, callback, request);
+ /* Some sanity checks... */
+ assert(conn->body.size_left > 0);
+ assert(conn->in.offset > 0);
+ assert(callback != NULL);
+ assert(buf != NULL);
+ /* How much do we have to process? */
+ size = conn->in.offset;
+ if (size > conn->body.size_left) /* only process the body part */
+ size = conn->body.size_left;
+ if (size > conn->body.bufsize) /* don't copy more than requested */
+ size = conn->body.bufsize;
+ xmemcpy(buf, conn->in.buf, size);
+ conn->body.size_left -= size;
+ /* Move any remaining data */
+ conn->in.offset -= size;
+ if (conn->in.offset > 0)
+ xmemmove(conn->in.buf, conn->in.buf + size, conn->in.offset);
+ /* Remove request link if this is the last part of the body, as
+ * clientReadRequest automatically continues to process next request */
+ if (conn->body.size_left <= 0 && request != NULL)
+ request->body_connection = NULL;
+ /* Remove clientReadBody arguments (the call is completed)*/
+ conn->body.request = NULL;
+ conn->body.callback = NULL;
+ conn->body.buf = NULL;
+ conn->body.bufsize = 0;
+ /* Remember that we have touched the body, not restartable */
+ EBIT_SET(request->flags, REQ_BODY_SENT);
+ /* Invoke callback function */
+ callback(buf, size, cbdata);
+ if (request != NULL)
+ requestUnlink(request); /* Linked in clientReadBody */
+ debug(33, 2) ("clientProcessBody: end fd=%d size=%d body_size=%d in.offset=%d cb=%p req=%p\n", conn->fd, size, conn->body.size_left, conn->in.offset, callback, request);
+ return;
+}
+
+/* A dummy handler that throws away a request-body */
+static char bodyAbortBuf[SQUID_TCP_SO_RCVBUF];
+void
+clientReadBodyAbortHandler(char *buf, size_t size, void *data)
+{
+ ConnStateData *conn = (ConnStateData *)data;
+ debug(33, 2) ("clientReadBodyAbortHandler: fd=%d body_size=%d in.offset=%d\n", conn->fd, conn->body.size_left, conn->in.offset);
+ if (size != 0) {
+ debug(33, 3) ("clientReadBodyAbortHandler: fd=%d shedule next read\n", conn->fd);
+ conn->body.callback = clientReadBodyAbortHandler;
+ conn->body.buf = bodyAbortBuf;
+ conn->body.bufsize = sizeof(bodyAbortBuf);
+ conn->body.cbdata = data;
+ }
+}
+
+/* Abort a body request */
+int
+clientAbortBody(request_t *request)
+{
+ ConnStateData *conn = request->body_connection;
+ char *buf;
+ CBCB *callback;
+ void *cbdata;
+ request->body_connection = NULL;
+ if (!conn || conn->body.size_left <= 0)
+ return 0; /* No body to abort */
+ if (conn->body.callback != NULL) {
+ buf = conn->body.buf;
+ callback = conn->body.callback;
+ cbdata = conn->body.cbdata;
+ assert(request == conn->body.request);
+ conn->body.buf = NULL;
+ conn->body.callback = NULL;
+ conn->body.cbdata = NULL;
+ conn->body.request = NULL;
+ callback(buf, -1, cbdata); /* Signal abort to clientReadBody caller */
+ requestUnlink(request);
+ }
+ clientReadBodyAbortHandler(NULL, -1, conn); /* Install abort handler */
+ /* clientProcessBody() */
+ return 1; /* Aborted */
 }
 
 /* general lifetime handler for HTTP requests */
Index: squid/src/enums.h
diff -u squid/src/enums.h:1.1.1.22 squid/src/enums.h:1.1.1.22.2.1
--- squid/src/enums.h:1.1.1.22 Thu Jul 23 22:37:50 1998
+++ squid/src/enums.h Sat Jul 25 13:24:04 1998
@@ -511,7 +511,8 @@
     REQ_REFRESH,
     REQ_USED_PROXY_AUTH,
     REQ_REDIRECTED,
- REQ_NOCACHE_HACK /* for changing no-cache requests into IMS */
+ REQ_NOCACHE_HACK, /* for changing no-cache requests into IMS */
+ REQ_BODY_SENT /* We have processed some of the request body, not restartable */
 };
 
 enum {
Index: squid/src/forward.c
diff -u squid/src/forward.c:1.1.1.1 squid/src/forward.c:1.1.1.1.2.1
--- squid/src/forward.c:1.1.1.1 Thu Jul 23 22:37:51 1998
+++ squid/src/forward.c Sat Jul 25 13:24:04 1998
@@ -101,9 +101,8 @@
         return 0;
     if (squid_curtime - fwdState->start > 120)
         return 0;
- if (pumpMethod(fwdState->request->method))
- if (0 == pumpRestart(fwdState->request))
- return 0;
+ if (EBIT_TEST(fwdState->request->flags, REQ_BODY_SENT))
+ return 0;
     return 1;
 }
 
Index: squid/src/ftp.c
diff -u squid/src/ftp.c:1.1.1.21.2.1 squid/src/ftp.c:1.1.1.21.2.2
--- squid/src/ftp.c:1.1.1.21.2.1 Fri Jul 24 00:11:59 1998
+++ squid/src/ftp.c Sat Jul 25 13:24:05 1998
@@ -141,6 +141,8 @@
 /* Local functions */
 static CNCB ftpPasvCallback;
 static PF ftpDataRead;
+static PF ftpDataWrite;
+static CWCB ftpDataWriteCallback;
 static PF ftpStateFree;
 static PF ftpTimeout;
 static PF ftpReadControlReply;
@@ -150,8 +152,6 @@
 static void ftpAppendSuccessHeader(FtpStateData * ftpState);
 static void ftpAuthRequired(HttpReply * reply, request_t * request, const char *realm);
 static void ftpHackShortcut(FtpStateData * ftpState, FTPSM * nextState);
-static void ftpPutStart(FtpStateData *);
-static CWCB ftpPutTransferDone;
 static void ftpUnhack(FtpStateData * ftpState);
 static void ftpScheduleReadControlReply(FtpStateData *, int);
 static void ftpHandleControlReply(FtpStateData *);
@@ -182,6 +182,7 @@
 static FTPSM ftpGetFile;
 static FTPSM ftpSendCwd;
 static FTPSM ftpReadCwd;
+static FTPSM ftpRestOrList;
 static FTPSM ftpSendList;
 static FTPSM ftpSendNlst;
 static FTPSM ftpReadList;
@@ -190,16 +191,15 @@
 static FTPSM ftpSendRetr;
 static FTPSM ftpReadRetr;
 static FTPSM ftpReadTransferDone;
-static FTPSM ftpSendQuit;
-static FTPSM ftpReadQuit;
-static FTPSM ftpFail;
-static FTPSM ftpDataTransferDone;
-static FTPSM ftpRestOrList;
 static FTPSM ftpSendStor;
 static FTPSM ftpReadStor;
+static FTPSM ftpWriteTransferDone;
 static FTPSM ftpSendReply;
-static FTPSM ftpTryMkdir;
+static FTPSM ftpSendMkdir;
 static FTPSM ftpReadMkdir;
+static FTPSM ftpFail;
+static FTPSM ftpSendQuit;
+static FTPSM ftpReadQuit;
 /************************************************
 ** State Machine Description (excluding hacks) **
 *************************************************
@@ -210,17 +210,21 @@
 Pass Type
 Type TraverseDirectory / GetFile
 TraverseDirectory Cwd / GetFile / ListDir
-Cwd TraverseDirectory
+Cwd TraverseDirectory / Mkdir
 GetFile Mdtm
 Mdtm Size
 Size Pasv
 ListDir Pasv
-Pasv RestOrList
-RestOrList Rest / Retr / Nlst / List
+Pasv FileOrList
+FileOrList Rest / Retr / Nlst / List / Mkdir (PUT /xxx;type=d)
 Rest Retr
-Retr / Nlst / List (ftpDataRead on datachannel)
-(ftpDataRead) ReadTransferDone
+Retr / Nlst / List DataRead* (on datachannel)
+DataRead* ReadTransferDone
 ReadTransferDone DataTransferDone
+Stor DataWrite* (on datachannel)
+DataWrite* RequestPutBody** (from client)
+RequestPutBody** DataWrite* / WriteTransferDone
+WriteTransferDone DataTransferDone
 DataTransferDone Quit
 Quit -
 ************************************************/
@@ -242,8 +246,8 @@
     ftpReadRetr,
     ftpReadStor,
     ftpReadQuit,
- ftpReadTransferDone,
- ftpSendReply,
+ ftpReadTransferDone, /* READING_DATA */
+ ftpWriteTransferDone, /* WRITING_DATA */
     ftpReadMkdir
 };
 
@@ -769,21 +773,35 @@
     xfree(sbuf);
 }
 
+/* Datachannel complete */
 static void
-ftpReadComplete(FtpStateData * ftpState)
+ftpDataComplete(FtpStateData * ftpState)
 {
- debug(9, 3) ("ftpReadComplete\n");
- /* Connection closed; retrieval done. */
+ debug(9, 3) ("ftpDataComplete\n");
+ /* Connection closed; transfer done. */
+ if (ftpState->data.fd >= 0) {
+ debug(9, 3) ("ftpDataComplete: closing data channel");
+ comm_close(ftpState->data.fd);
+ ftpState->data.fd = -1;
+ }
     if (ftpState->flags.html_header_sent)
         ftpListingFinish(ftpState);
- if (!ftpState->flags.put) {
- storeTimestampsSet(ftpState->entry);
- storeComplete(ftpState->entry);
- }
     /* expect the "transfer complete" message on the control socket */
     ftpScheduleReadControlReply(ftpState, 1);
 }
 
+static void ftpDataFail(FtpStateData *ftpState)
+{
+ debug(9, 0) ("ftpDataFail not implemented\n");
+ ftpDataComplete(ftpState);
+}
+
+static void ftpDataAbort(FtpStateData *ftpState)
+{
+ debug(9, 0) ("ftpDataAbort not implemented\n");
+ ftpDataComplete(ftpState);
+}
+
 static void
 ftpDataRead(int fd, void *data)
 {
@@ -792,11 +810,9 @@
     int j;
     int bin;
     StoreEntry *entry = ftpState->entry;
- MemObject *mem = entry->mem_obj;
     assert(fd == ftpState->data.fd);
     if (fwdAbortFetch(entry)) {
- storeAbort(entry, 0);
- ftpDataTransferDone(ftpState);
+ ftpDataAbort(ftpState);
         return;
     }
     errno = 0;
@@ -822,7 +838,7 @@
         ftpListingStart(ftpState);
     }
     if (len < 0) {
- debug(50, 1) ("ftpDataRead: read error: %s\n", xstrerror());
+ debug(9, 1) ("ftpDataRead: read error: %s\n", xstrerror());
         if (ignoreErrno(errno)) {
             commSetSelect(fd,
                 COMM_SELECT_READ,
@@ -830,12 +846,10 @@
                 data,
                 Config.Timeout.read);
         } else {
- assert(mem->inmem_hi > 0);
- storeAbort(entry, 0);
- ftpDataTransferDone(ftpState);
+ ftpDataFail(ftpState);
         }
     } else if (len == 0) {
- ftpReadComplete(ftpState);
+ ftpDataComplete(ftpState);
     } else {
         if (ftpState->flags.isdir) {
             ftpParseListing(ftpState);
@@ -843,14 +857,11 @@
             storeAppend(entry, ftpState->data.buf, len);
             ftpState->data.offset = 0;
         }
- if (ftpState->size > 0 && mem->inmem_hi >= ftpState->size + mem->reply->hdr_sz)
- ftpReadComplete(ftpState);
- else
- commSetSelect(fd,
- COMM_SELECT_READ,
- ftpDataRead,
- data,
- Config.Timeout.read);
+ commSetSelect(fd,
+ COMM_SELECT_READ,
+ ftpDataRead,
+ data,
+ Config.Timeout.read);
     }
 }
 
@@ -1032,7 +1043,7 @@
     if (errflag == COMM_ERR_CLOSING)
         return;
     if (errflag) {
- debug(50, 1) ("ftpWriteCommandCallback: FD %d: %s\n", fd, xstrerror());
+ debug(9, 1) ("ftpWriteCommandCallback: FD %d: %s\n", fd, xstrerror());
         if (entry->mem_obj->inmem_hi == 0) {
             err = errorCon(ERR_WRITE_ERROR, HTTP_SERVICE_UNAVAILABLE);
             err->xerrno = errno;
@@ -1145,7 +1156,7 @@
     }
     debug(9, 5) ("ftpReadControlReply: FD %d, Read %d bytes\n", fd, len);
     if (len < 0) {
- debug(50, 1) ("ftpReadControlReply: read error: %s\n", xstrerror());
+ debug(9, 1) ("ftpReadControlReply: read error: %s\n", xstrerror());
         if (ignoreErrno(errno)) {
             ftpScheduleReadControlReply(ftpState, 0);
         } else {
@@ -1417,15 +1428,15 @@
         if (!ftpState->flags.put)
             ftpFail(ftpState);
         else
- ftpTryMkdir(ftpState);
+ ftpSendMkdir(ftpState);
     }
 }
 
 static void
-ftpTryMkdir(FtpStateData * ftpState)
+ftpSendMkdir(FtpStateData * ftpState)
 {
     char *path = ftpState->filepath;
- debug(9, 3) ("ftpTryMkdir: with path=%s\n", path);
+ debug(9, 3) ("ftpSendMkdir: with path=%s\n", path);
     snprintf(cbuf, 1024, "MKD %s\r\n", path);
     ftpWriteCommand(cbuf, ftpState);
     ftpState->state = SENT_MKDIR;
@@ -1763,23 +1774,25 @@
      * host NULL -> not connected, port == local port
      * host set -> connected, port == remote port
      */
- /* Restart state (SENT_NLST/LIST/RETR) */
+ /* Restart state (SENT_NLST/LIST/RETR/STOR) */
     FTP_SM_FUNCS[ftpState->state] (ftpState);
 }
 
 static void
 ftpRestOrList(FtpStateData * ftpState)
 {
-
     debug(9, 3) ("This is ftpRestOrList\n");
- if (ftpState->flags.put) {
- debug(9, 3) ("ftpRestOrList: Sending STOR request...\n");
- ftpSendStor(ftpState);
- } else if (ftpState->typecode == 'D') {
- /* XXX This should NOT be here */
- ftpSendNlst(ftpState); /* sec 3.2.2 of RFC 1738 */
+ if (ftpState->typecode == 'D') {
         ftpState->flags.isdir = 1;
         ftpState->flags.use_base = 1;
+ if (ftpState->flags.put) {
+ ftpSendMkdir(ftpState); /* PUT name;type=d */
+ } else {
+ ftpSendNlst(ftpState); /* GET name;type=d sec 3.2.2 of RFC 1738 */
+ }
+ } else if (ftpState->flags.put) {
+ debug(9, 3) ("ftpRestOrList: Sending STOR request...\n");
+ ftpSendStor(ftpState);
     } else if (ftpState->flags.isdir)
         ftpSendList(ftpState);
     else if (ftpState->restart_offset > 0)
@@ -1802,24 +1815,39 @@
 {
     int code = ftpState->ctrl.replycode;
     debug(9, 3) ("This is ftpReadStor\n");
- if (code >= 100 && code < 200) {
+ if (code == 125 || (code == 150 && ftpState->data.host)) {
+ /* Begin data transfer */
+ debug(9, 3) ("ftpReadStor: starting data channel\n");
+ commSetSelect(ftpState->data.fd,
+ COMM_SELECT_WRITE,
+ ftpDataWrite,
+ ftpState,
+ Config.Timeout.read);
+ commSetDefer(ftpState->data.fd, NULL, NULL);
+ ftpState->state = WRITING_DATA;
         /*
- * Cancel the timeout on the Control socket, pumpStart will
- * establish one on the data socket.
+ * Cancel the timeout on the Control socket and establish one
+ * on the data socket
          */
         commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
- ftpPutStart(ftpState);
- debug(9, 3) ("ftpReadStor: writing data channel\n");
- ftpState->state = WRITING_DATA;
- } else if (code == 553) {
- /* directory does not exist, have to create, sigh */
-#if WORK_IN_PROGRESS
- ftpTraverseDirectory(ftpState);
-#endif
- ftpSendReply(ftpState);
+ commSetTimeout(ftpState->data.fd, Config.Timeout.read, ftpTimeout,
+ ftpState);
+ } else if (code == 150) {
+ /* Accept data channel */
+ commSetSelect(ftpState->data.fd,
+ COMM_SELECT_WRITE,
+ ftpAcceptDataConnection,
+ ftpState,
+ 0);
+ /*
+ * Cancel the timeout on the Control socket and establish one
+ * on the data socket
+ */
+ commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
+ commSetTimeout(ftpState->data.fd, Config.Timeout.read, ftpTimeout,
+ ftpState);
     } else {
- debug(9, 3) ("ftpReadStor: that's all folks\n");
- ftpSendReply(ftpState);
+ ftpFail(ftpState);
     }
 }
 
@@ -1987,18 +2015,67 @@
         debug(9, 1) ("--> releasing '%s'\n", storeUrl(ftpState->entry));
         storeReleaseRequest(ftpState->entry);
     }
- ftpDataTransferDone(ftpState);
+ storeTimestampsSet(ftpState->entry);
+ storeComplete(ftpState->entry);
+ ftpSendQuit(ftpState);
 }
 
+/* This will be called when there is data available to put */
 static void
-ftpDataTransferDone(FtpStateData * ftpState)
+ftpRequestBody(char *buf, size_t size, void *data)
 {
- debug(9, 3) ("This is ftpDataTransferDone\n");
- if (ftpState->data.fd > -1) {
- comm_close(ftpState->data.fd);
- ftpState->data.fd = -1;
+ FtpStateData *ftpState = (FtpStateData *)data;
+ debug(9, 3) ("ftpRequestBody: buf=%p size=%d ftpState=%p\n",buf,size,data);
+ ftpState->data.offset = size;
+ if (size > 0) {
+ /* DataWrite */
+ comm_write(ftpState->data.fd, buf, size, ftpDataWriteCallback, data, NULL);
+ } else if (size < 0) {
+ /* Error */
+ debug(9, 1) ("ftpRequestBody: request aborted");
+ ftpDataAbort(ftpState);
+ } else if (size == 0) {
+ /* End of transfer */
+ ftpDataComplete(ftpState);
+ }
+}
+
+/* This will be called when the put write is completed */
+static void
+ftpDataWriteCallback(int fd, char *buf, size_t size, int err, void *data)
+{
+ FtpStateData *ftpState = (FtpStateData *)data;
+ if (!err) {
+ /* Shedule the rest of the request */
+ clientReadBody(ftpState->request, ftpState->data.buf, ftpState->data.size, ftpRequestBody, ftpState);
+ } else {
+ debug(9, 1) ("ftpDataWriteCallback: write error: %s\n", xstrerror());
+ ftpDataFail(ftpState);
     }
- ftpSendQuit(ftpState);
+}
+
+static void
+ftpDataWrite(int ftp, void *data)
+{
+ FtpStateData * ftpState = (FtpStateData *)data;
+ debug(9, 3) ("ftpDataWrite\n");
+ /* This starts the body transfer */
+ clientReadBody(ftpState->request, ftpState->data.buf, ftpState->data.size, ftpRequestBody, ftpState);
+}
+
+static void
+ftpWriteTransferDone(FtpStateData * ftpState)
+{
+ int code = ftpState->ctrl.replycode;
+ debug(9, 3) ("This is ftpWriteTransferDone\n");
+ if (code != 226) {
+ debug(9, 1) ("ftpReadTransferDone: Got code %d after sending data\n",
+ code);
+ debug(9, 1) ("--> releasing '%s'\n", storeUrl(ftpState->entry));
+ storeReleaseRequest(ftpState->entry);
+ }
+ storeTimestampsSet(ftpState->entry); /* XXX Is this needed? */
+ ftpSendReply(ftpState);
 }
 
 static void
@@ -2131,25 +2208,6 @@
 }
 
 static void
-ftpPutStart(FtpStateData * ftpState)
-{
- debug(9, 3) ("ftpPutStart\n");
- pumpStart(ftpState->data.fd, ftpState->entry,
- ftpState->request, ftpPutTransferDone, ftpState);
-}
-
-static void
-ftpPutTransferDone(int fd, char *bufnotused, size_t size, int errflag, void *data)
-{
- FtpStateData *ftpState = data;
- if (ftpState->data.fd >= 0) {
- comm_close(ftpState->data.fd);
- ftpState->data.fd = -1;
- }
- ftpReadComplete(ftpState);
-}
-
-static void
 ftpSendReply(FtpStateData * ftpState)
 {
     ErrorState *err;
@@ -2179,7 +2237,7 @@
         err->ftp.reply = ftpState->ctrl.last_reply;
     errorAppendEntry(ftpState->entry, err);
     storeBufferFlush(ftpState->entry);
- comm_close(ftpState->ctrl.fd);
+ ftpSendQuit(ftpState);
 }
 
 static void
Index: squid/src/http.c
diff -u squid/src/http.c:1.1.1.20.2.1 squid/src/http.c:1.1.1.20.2.2
--- squid/src/http.c:1.1.1.20.2.1 Thu Jul 23 22:51:10 1998
+++ squid/src/http.c Sat Jul 25 13:24:06 1998
@@ -400,6 +400,16 @@
             return;
         }
     }
+ if (!httpState->reply_hdr && len>0) {
+ /* Skip whitespace */
+ while(len > 0 && isspace(*buf))
+ xmemmove(buf, buf+1, len--);
+ if (len == 0) {
+ /* Continue to read... */
+ commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0);
+ return;
+ }
+ }
     if (len < 0) {
         debug(50, 2) ("httpReadReply: FD %d: read failure: %s.\n",
             fd, xstrerror());
@@ -666,7 +676,8 @@
 
     debug(11, 5) ("httpSendRequest: FD %d: httpState %p.\n", fd, httpState);
 
- if (pumpMethod(req->method))
+ /* Do we have a body to process? */
+ if (httpState->orig_request->body_connection)
         sendHeaderDone = httpSendRequestEntry;
     else
         sendHeaderDone = httpSendComplete;
@@ -777,6 +788,23 @@
 }
 
 static void
+httpRequestBodyHandler(char *buf, size_t size, void *data)
+{
+ HttpStateData *httpState = (HttpStateData *)data;
+ if ( size > 0 ) {
+ comm_write(httpState->fd, buf, size, httpSendRequestEntry, data, memFree8K);
+ } else if (size == 0) {
+ /* End of body */
+ memFree8K(buf);
+ httpSendComplete(httpState->fd, NULL, 0, 0, data);
+ } else {
+ /* Failed to get whole body, probably aborted */
+ memFree8K(buf);
+ httpSendComplete(httpState->fd, NULL, 0, COMM_ERR_CLOSING, data);
+ }
+}
+
+static void
 httpSendRequestEntry(int fd, char *bufnotused, size_t size, int errflag, void *data)
 {
     HttpStateData *httpState = data;
@@ -799,5 +827,5 @@
         comm_close(fd);
         return;
     }
- pumpStart(fd, entry, httpState->orig_request, httpSendComplete, httpState);
+ clientReadBody(httpState->orig_request, memAllocate(MEM_8K_BUF), 8192, httpRequestBodyHandler, httpState);
 }
Index: squid/src/protos.h
diff -u squid/src/protos.h:1.1.1.23 squid/src/protos.h:1.1.1.23.2.1
--- squid/src/protos.h:1.1.1.23 Thu Jul 23 22:37:57 1998
+++ squid/src/protos.h Sat Jul 25 13:24:06 1998
@@ -121,6 +121,8 @@
 extern void clientHttpConnectionsClose(void);
 extern StoreEntry *clientCreateStoreEntry(clientHttpRequest *, method_t, int);
 extern int isTcpHit(log_type);
+extern void clientReadBody(request_t *req, char *buf, size_t size, CBCB *callback, void *data);
+extern int clientAbortBody(request_t *req);
 
 extern int commSetNonBlocking(int fd);
 extern void commSetCloseOnExec(int fd);
@@ -655,7 +657,6 @@
 extern void start_announce(void *unused);
 extern void sslStart(int fd, const char *, request_t *, size_t * sz);
 extern void waisStart(request_t *, StoreEntry *, int fd);
-extern void passStart(int, const char *, request_t *, size_t *);
 extern void identStart(int, ConnStateData *, IDCB * callback, void *);
 
 extern void statInit(void);
@@ -938,10 +939,12 @@
 extern void PrintRusage(void);
 extern void dumpMallocStats(void);
 
+#if OLD_CODE
 extern void pumpInit(int fd, request_t * r, char *uri);
 extern void pumpStart(int, StoreEntry *, request_t *, CWCB * callback, void *);
 extern int pumpMethod(method_t method);
 extern int pumpRestart(request_t *);
+#endif
 
 extern void unlinkdInit(void);
 extern void unlinkdClose(void);
Index: squid/src/pump.c
diff -u squid/src/pump.c:1.1.1.6 squid/src/pump.c:1.1.1.6.2.1
--- squid/src/pump.c:1.1.1.6 Thu Jul 23 22:37:58 1998
+++ squid/src/pump.c Sat Jul 25 13:24:07 1998
@@ -1,3 +1,4 @@
+#if OLD_CODE
 /*
  * $Id$
  *
@@ -458,3 +459,4 @@
     debug(61, 3) ("pumpRestart: YES!\n");
     return 1;
 }
+#endif
Index: squid/src/structs.h
diff -u squid/src/structs.h:1.1.1.24 squid/src/structs.h:1.1.1.24.2.1
--- squid/src/structs.h:1.1.1.24 Thu Jul 23 22:38:03 1998
+++ squid/src/structs.h Sat Jul 25 13:24:08 1998
@@ -777,6 +777,14 @@
         off_t offset;
         size_t size;
     } in;
+ struct {
+ size_t size_left; /* How much body left to process */
+ request_t *request; /* Parameters passed to clientReadBody */
+ char *buf;
+ size_t bufsize;
+ CBCB *callback;
+ void *cbdata;
+ } body;
     clientHttpRequest *chr;
     struct sockaddr_in peer;
     struct sockaddr_in me;
@@ -1174,8 +1182,7 @@
     int max_forwards;
     struct in_addr client_addr;
     HttpHeader header;
- char *body;
- size_t body_sz;
+ ConnStateData *body_connection; /* used by clientReadBody() */
     HierarchyLogEntry hier;
     err_type err_type;
 };
Index: squid/src/typedefs.h
diff -u squid/src/typedefs.h:1.1.1.15 squid/src/typedefs.h:1.1.1.15.2.1
--- squid/src/typedefs.h:1.1.1.15 Thu Jul 23 22:38:04 1998
+++ squid/src/typedefs.h Sat Jul 25 13:24:08 1998
@@ -184,6 +184,7 @@
 typedef void RH(void *data, char *);
 typedef void UH(void *data, wordlist *);
 typedef int DEFER(int fd, void *data);
+typedef void CBCB(char *buf, size_t size, void *data);
 
 typedef void SIH(int fd, void *); /* swap in */
 typedef int QS(const void *, const void *); /* qsort */
Received on Tue Jul 29 2003 - 13:15:51 MDT

This archive was generated by hypermail pre-2.1.9 : Tue Dec 09 2003 - 16:11:50 MST