=== modified file 'src/CommCalls.h' --- src/CommCalls.h 2014-03-15 11:42:55 +0000 +++ src/CommCalls.h 2014-06-03 17:32:07 +0000 @@ -89,55 +89,52 @@ class CommAcceptCbParams: public CommCommonCbParams { public: CommAcceptCbParams(void *aData); void print(std::ostream &os) const; /// Transaction which this call is part of. MasterXaction::Pointer xaction; }; // connect parameters class CommConnectCbParams: public CommCommonCbParams { public: CommConnectCbParams(void *aData); bool syncWithComm(); // see CommCommonCbParams::syncWithComm }; -class SBuf; - // read/write (I/O) parameters class CommIoCbParams: public CommCommonCbParams { public: CommIoCbParams(void *aData); void print(std::ostream &os) const; bool syncWithComm(); // see CommCommonCbParams::syncWithComm public: char *buf; size_t size; - SBuf *buf2; // alternative buffer for use when buf is unset }; // close parameters class CommCloseCbParams: public CommCommonCbParams { public: CommCloseCbParams(void *aData); }; class CommTimeoutCbParams: public CommCommonCbParams { public: CommTimeoutCbParams(void *aData); }; /// Special Calls parameter, for direct use of an FD without a controlling Comm::Connection /// This is used for pipe() FD with helpers, and internally by Comm when handling some special FD actions. class FdeCbParams: public CommCommonCbParams { public: === modified file 'src/client_side.cc' --- src/client_side.cc 2014-05-21 06:29:38 +0000 +++ src/client_side.cc 2014-06-03 22:15:31 +0000 @@ -77,40 +77,41 @@ * the next ClientHttpRequest, and will send it, restablishing the * data flow. */ #include "squid.h" #include "acl/FilledChecklist.h" #include "anyp/PortCfg.h" #include "base/Subscription.h" #include "base/TextException.h" #include "CachePeer.h" #include "ChunkedCodingParser.h" #include "client_db.h" #include "client_side.h" #include "client_side_reply.h" #include "client_side_request.h" #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" #include "comm/Connection.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommCalls.h" #include "errorpage.h" #include "fd.h" #include "fde.h" #include "fqdncache.h" #include "FwdState.h" #include "globals.h" #include "http.h" #include "HttpHdrContRange.h" #include "HttpHeaderTools.h" #include "HttpReply.h" #include "HttpRequest.h" #include "ident/Config.h" #include "ident/Ident.h" #include "internal.h" #include "ipc/FdNotes.h" #include "ipc/StartListening.h" #include "log/access_log.h" @@ -220,57 +221,57 @@ char *skipLeadingSpace(char *aString); static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount); clientStreamNode * ClientSocketContext::getTail() const { if (http->client_stream.tail) return (clientStreamNode *)http->client_stream.tail->data; return NULL; } clientStreamNode * ClientSocketContext::getClientReplyContext() const { return (clientStreamNode *)http->client_stream.tail->prev->data; } /** - * This routine should be called to grow the inbuf and then - * call comm_read(). + * This routine should be called to grow the in.buf and then + * call Comm::Read(). */ void ConnStateData::readSomeData() { if (reading()) return; debugs(33, 4, HERE << clientConnection << ": reading request..."); if (!in.maybeMakeSpaceAvailable()) return; typedef CommCbMemFunT Dialer; reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); - comm_read(clientConnection, in.buf, reader); + Comm::Read(clientConnection, reader); } void ClientSocketContext::removeFromConnectionList(ConnStateData * conn) { ClientSocketContext::Pointer *tempContextPointer; assert(conn != NULL && cbdataReferenceValid(conn)); assert(conn->getCurrentContext() != NULL); /* Unlink us from the connection request list */ tempContextPointer = & conn->currentobject; while (tempContextPointer->getRaw()) { if (*tempContextPointer == this) break; tempContextPointer = &(*tempContextPointer)->next; } assert(tempContextPointer->getRaw() != NULL); *tempContextPointer = next; @@ -2400,60 +2401,40 @@ for (S = (ClientSocketContext::Pointer *) & currentobject; S->getRaw(); S = &(*S)->next); *S = context; ++nrequests; } int ConnStateData::getConcurrentRequestCount() const { int result = 0; ClientSocketContext::Pointer *T; for (T = (ClientSocketContext::Pointer *) ¤tobject; T->getRaw(); T = &(*T)->next, ++result); return result; } int -ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno) -{ - if (flag != COMM_OK) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": got flag " << flag); - return 1; - } - - if (size < 0) { - if (!ignoreErrno(xerrno)) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno)); - return 1; - } else if (in.buf.isEmpty()) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")"); - } - } - - return 0; -} - -int ConnStateData::connFinishedWithConn(int size) { if (size == 0) { if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) { /* no current or pending requests */ debugs(33, 4, HERE << clientConnection << " closed"); return 1; } else if (!Config.onoff.half_closed_clients) { /* admin doesn't want to support half-closed client sockets */ debugs(33, 3, HERE << clientConnection << " aborted (half_closed_clients disabled)"); notifyAllContexts(0); // no specific error implies abort return 1; } } return 0; } void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount) @@ -2967,137 +2948,138 @@ commSetConnTimeout(clientConnection, Config.Timeout.lifetime, timeoutCall); clientProcessRequest(this, &parser_, context, method, http_ver); parsed_req = true; // XXX: do we really need to parse everything right NOW ? if (context->mayUseConnection()) { debugs(33, 3, HERE << "Not parsing new requests, as this request may need the connection"); break; } } } /* XXX where to 'finish' the parsing pass? */ return parsed_req; } void ConnStateData::clientReadRequest(const CommIoCbParams &io) { - debugs(33,5,HERE << io.conn << " size " << io.size); + debugs(33,5, io.conn); Must(reading()); reader = NULL; /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ - if (io.flag == COMM_ERR_CLOSING) { - debugs(33,5, HERE << io.conn << " closing Bailout."); + debugs(33,5, io.conn << " closing Bailout."); return; } assert(Comm::IsConnOpen(clientConnection)); assert(io.conn->fd == clientConnection->fd); - /* - * Don't reset the timeout value here. The timeout value will be - * set to Config.Timeout.request by httpAccept() and - * clientWriteComplete(), and should apply to the request as a - * whole, not individual read() calls. Plus, it breaks our - * lame half-close detection - */ - if (connReadWasError(io.flag, io.size, io.xerrno)) { - notifyAllContexts(io.xerrno); - io.conn->close(); + CommIoCbParams rd(this); // will be expanded with ReadNow results + rd.conn = io.conn; + switch (Comm::ReadNow(rd, in.buf)) + { + case COMM_INPROGRESS: + if (in.buf.isEmpty()) + debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno)); + readSomeData(); return; - } - if (io.flag == COMM_OK) { - if (io.size > 0) { - kb_incr(&(statCounter.client_http.kbytes_in), io.size); + case COMM_OK: + kb_incr(&(statCounter.client_http.kbytes_in), rd.size); + // may comm_close or setReplyToError + if (!handleReadData()) + return; - // may comm_close or setReplyToError - if (!handleReadData(io.buf2)) - return; + /* Continue to process previously read data */ + break; - } else if (io.size == 0) { - debugs(33, 5, HERE << io.conn << " closed?"); + case COMM_ERR_CLOSING: // close detected by 0-byte read + debugs(33, 5, io.conn << " closed?"); - if (connFinishedWithConn(io.size)) { - clientConnection->close(); - return; - } - - /* It might be half-closed, we can't tell */ - fd_table[io.conn->fd].flags.socket_eof = true; + if (connFinishedWithConn(rd.size)) { + clientConnection->close(); + return; + } - commMarkHalfClosed(io.conn->fd); + /* It might be half-closed, we can't tell */ + fd_table[io.conn->fd].flags.socket_eof = true; + commMarkHalfClosed(io.conn->fd); + fd_note(io.conn->fd, "half-closed"); + + /* There is one more close check at the end, to detect aborted + * (partial) requests. At this point we can't tell if the request + * is partial. + */ - fd_note(io.conn->fd, "half-closed"); + /* Continue to process previously read data */ + break; - /* There is one more close check at the end, to detect aborted - * (partial) requests. At this point we can't tell if the request - * is partial. - */ - /* Continue to process previously read data */ - } + // case COMM_ERROR: + default: // no other flags should ever occur + debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno)); + notifyAllContexts(rd.xerrno); + io.conn->close(); + return; } /* Process next request */ if (getConcurrentRequestCount() == 0) fd_note(io.fd, "Reading next request"); if (!clientParseRequests()) { if (!isOpen()) return; /* * If the client here is half closed and we failed * to parse a request, close the connection. * The above check with connFinishedWithConn() only * succeeds _if_ the buffer is empty which it won't * be if we have an incomplete request. * XXX: This duplicates ClientSocketContext::keepaliveNextRequest */ if (getConcurrentRequestCount() == 0 && commIsHalfClosed(io.fd)) { debugs(33, 5, HERE << io.conn << ": half-closed connection, no completed request parsed, connection closing."); clientConnection->close(); return; } } if (!isOpen()) return; clientAfterReadingRequests(); } /** * called when new request data has been read from the socket * * \retval false called comm_close or setReplyToError (the caller should bail) * \retval true we did not call comm_close or setReplyToError */ bool -ConnStateData::handleReadData(SBuf *buf) +ConnStateData::handleReadData() { - assert(buf == &in.buf); // XXX: make this abort the transaction if this fails - // if we are reading a body, stuff data into the body pipe if (bodyPipe != NULL) return handleRequestBodyData(); return true; } /** * called when new request body data has been buffered in in.buf * may close the connection if we were closing and piped everything out * * \retval false called comm_close or setReplyToError (the caller should bail) * \retval true we did not call comm_close or setReplyToError */ bool ConnStateData::handleRequestBodyData() { assert(bodyPipe != NULL); size_t putSize = 0; @@ -3614,42 +3596,43 @@ { ConnStateData *connState = (ConnStateData *) data; // if the connection is closed or closing, just return. if (!connState->isOpen()) return; // Require both a match and a positive bump mode to work around exceptional // cases where ACL code may return ACCESS_ALLOWED with zero answer.kind. if (answer == ACCESS_ALLOWED && answer.kind != Ssl::bumpNone) { debugs(33, 2, HERE << "sslBump needed for " << connState->clientConnection); connState->sslBumpMode = static_cast(answer.kind); httpsEstablish(connState, NULL, (Ssl::BumpMode)answer.kind); } else { debugs(33, 2, HERE << "sslBump not needed for " << connState->clientConnection); connState->sslBumpMode = Ssl::bumpNone; // fake a CONNECT request to force connState to tunnel static char ip[MAX_IPSTRLEN]; connState->clientConnection->local.toUrl(ip, sizeof(ip)); + // XXX need to *pre-pend* this fake request to the TLS bits already in the buffer connState->in.buf.append("CONNECT ").append(ip).append(" HTTP/1.1\r\nHost: ").append(ip).append("\r\n\r\n"); - bool ret = connState->handleReadData(&connState->in.buf); + bool ret = connState->handleReadData(); if (ret) ret = connState->clientParseRequests(); if (!ret) { debugs(33, 2, HERE << "Failed to start fake CONNECT request for ssl bumped connection: " << connState->clientConnection); connState->clientConnection->close(); } } } /** handle a new HTTPS connection */ static void httpsAccept(const CommAcceptCbParams ¶ms) { MasterXaction::Pointer xact = params.xaction; const AnyP::PortCfgPointer s = xact->squidPort; if (!s.valid()) { // it is possible the call or accept() was still queued when the port was reconfigured debugs(33, 2, "HTTPS accept failure: port reconfigured."); @@ -4281,41 +4264,41 @@ } CBDATA_CLASS_INIT(ConnStateData); bool ConnStateData::transparent() const { return clientConnection != NULL && (clientConnection->flags & (COMM_TRANSPARENT|COMM_INTERCEPTION)); } bool ConnStateData::reading() const { return reader != NULL; } void ConnStateData::stopReading() { if (reading()) { - comm_read_cancel(clientConnection->fd, reader); + Comm::ReadCancel(clientConnection->fd, reader); reader = NULL; } } BodyPipe::Pointer ConnStateData::expectRequestBody(int64_t size) { bodyPipe = new BodyPipe(this); if (size >= 0) bodyPipe->setBodySize(size); else startDechunkingRequest(); return bodyPipe; } int64_t ConnStateData::mayNeedToReadMoreBody() const { if (!bodyPipe) return 0; // request without a body or read/produced all body bytes @@ -4481,49 +4464,48 @@ Dialer, this, ConnStateData::clientPinnedConnectionClosed); // remember the pinned connection so that cb does not unpin a fresher one typedef CommCloseCbParams Params; Params ¶ms = GetCommParams(pinning.closeHandler); params.conn = pinning.serverConnection; comm_add_close_handler(pinning.serverConnection->fd, pinning.closeHandler); startPinnedConnectionMonitoring(); } /// Assign a read handler to an idle pinned connection so that we can detect connection closures. void ConnStateData::startPinnedConnectionMonitoring() { if (pinning.readHandler != NULL) return; // already monitoring typedef CommCbMemFunT Dialer; pinning.readHandler = JobCallback(33, 3, Dialer, this, ConnStateData::clientPinnedConnectionRead); - static char unusedBuf[8]; - comm_read(pinning.serverConnection, unusedBuf, sizeof(unusedBuf), pinning.readHandler); + Comm::Read(pinning.serverConnection, pinning.readHandler); } void ConnStateData::stopPinnedConnectionMonitoring() { if (pinning.readHandler != NULL) { - comm_read_cancel(pinning.serverConnection->fd, pinning.readHandler); + Comm::ReadCancel(pinning.serverConnection->fd, pinning.readHandler); pinning.readHandler = NULL; } } /// Our read handler called by Comm when the server either closes an idle pinned connection or /// perhaps unexpectedly sends something on that idle (from Squid p.o.v.) connection. void ConnStateData::clientPinnedConnectionRead(const CommIoCbParams &io) { pinning.readHandler = NULL; // Comm unregisters handlers before calling if (io.flag == COMM_ERR_CLOSING) return; // close handler will clean up // We could use getConcurrentRequestCount(), but this may be faster. const bool clientIsIdle = !getCurrentContext(); debugs(33, 3, "idle pinned " << pinning.serverConnection << " read " << io.size << (clientIsIdle ? " with idle client" : "")); === modified file 'src/client_side.h' --- src/client_side.h 2014-03-30 12:00:34 +0000 +++ src/client_side.h 2014-06-03 17:16:09 +0000 @@ -273,41 +273,41 @@ bool transparent() const; bool reading() const; void stopReading(); ///< cancels comm_read if it is scheduled /// true if we stopped receiving the request const char *stoppedReceiving() const { return stoppedReceiving_; } /// true if we stopped sending the response const char *stoppedSending() const { return stoppedSending_; } /// note request receiving error and close as soon as we write the response void stopReceiving(const char *error); /// note response sending error and close as soon as we read the request void stopSending(const char *error); void expectNoForwarding(); ///< cleans up virgin request [body] forwarding state BodyPipe::Pointer expectRequestBody(int64_t size); virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); - bool handleReadData(SBuf *buf); + bool handleReadData(); bool handleRequestBodyData(); /** * Correlate the current ConnStateData object with the pinning_fd socket descriptor. */ void pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth); /** * Decorrelate the ConnStateData object from its pinned CachePeer */ void unpinConnection(); /** * Checks if there is pinning info if it is valid. It can close the server side connection * if pinned info is not valid. \param request if it is not NULL also checks if the pinning info refers to the request client side HttpRequest \param CachePeer if it is not NULL also check if the CachePeer is the pinning CachePeer \return The details of the server side connection (may be closed if failures were present). */ const Comm::ConnectionPointer validatePinnedConnection(HttpRequest *request, const CachePeer *peer); /** * returts the pinned CachePeer if exists, NULL otherwise @@ -368,41 +368,40 @@ /// Otherwise, writes the error to the client and returns true. Also checks /// for SQUID_X509_V_ERR_DOMAIN_MISMATCH on bumped requests. bool serveDelayedError(ClientSocketContext *context); Ssl::BumpMode sslBumpMode; ///< ssl_bump decision (Ssl::bumpEnd if n/a). #else bool switchedToHttps() const { return false; } #endif protected: void startDechunkingRequest(); void finishDechunkingRequest(bool withSuccess); void abortChunkedRequestBody(const err_type error); err_type handleChunkedRequestBody(size_t &putSize); void startPinnedConnectionMonitoring(); void clientPinnedConnectionRead(const CommIoCbParams &io); private: - int connReadWasError(comm_err_t flag, int size, int xerrno); int connFinishedWithConn(int size); void clientAfterReadingRequests(); bool concurrentRequestQueueFilled() const; #if USE_AUTH /// some user details that can be used to perform authentication on this connection Auth::UserRequest::Pointer auth_; #endif HttpParser parser_; // XXX: CBDATA plays with public/private and leaves the following 'private' fields all public... :( #if USE_OPENSSL bool switchedToHttps_; /// The SSL server host name appears in CONNECT request or the server ip address for the intercepted requests String sslConnectHostOrIp; ///< The SSL server host name as passed in the CONNECT request String sslCommonName; ///< CN name for SSL certificate generation String sslBumpCertKey; ///< Key to use to store/retrieve generated certificate === modified file 'src/comm.cc' --- src/comm.cc 2014-04-30 10:50:09 +0000 +++ src/comm.cc 2014-06-03 20:56:18 +0000 @@ -22,40 +22,41 @@ * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * * * Copyright (c) 2003, Robert Collins */ #include "squid.h" #include "ClientInfo.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" #include "comm/Connection.h" #include "comm/IoCallback.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommRead.h" #include "compat/cmsg.h" #include "DescriptorSet.h" #include "event.h" #include "fd.h" #include "fde.h" #include "globals.h" #include "icmp/net_db.h" #include "ip/Intercept.h" #include "ip/QosConfig.h" #include "ip/tools.h" #include "pconn.h" #include "profiler/Profiler.h" #include "SBuf.h" #include "SquidConfig.h" #include "StatCounters.h" #include "StoreIOBuffer.h" #include "tools.h" @@ -63,315 +64,95 @@ #if USE_OPENSSL #include "ssl/support.h" #endif #include #include #if _SQUID_CYGWIN_ #include #endif #ifdef HAVE_NETINET_TCP_H #include #endif #if HAVE_SYS_UN_H #include #endif /* * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything. */ -static void commStopHalfClosedMonitor(int fd); static IOCB commHalfClosedReader; static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI); static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI); #if USE_DELAY_POOLS CBDATA_CLASS_INIT(CommQuotaQueue); static void commHandleWriteHelper(void * data); #endif /* STATIC */ static DescriptorSet *TheHalfClosed = NULL; /// the set of half-closed FDs static bool WillCheckHalfClosed = false; /// true if check is scheduled static EVH commHalfClosedCheck; static void commPlanHalfClosedCheck(); static comm_err_t commBind(int s, struct addrinfo &); static void commSetReuseAddr(int); static void commSetNoLinger(int); #ifdef TCP_NODELAY static void commSetTcpNoDelay(int); #endif static void commSetTcpRcvbuf(int, int); fd_debug_t *fdd_table = NULL; bool isOpen(const int fd) { return fd >= 0 && fd_table && fd_table[fd].flags.open != 0; } /** - * Attempt a read - * - * If the read attempt succeeds or fails, call the callback. - * Else, wait for another IO notification. - */ -void -commHandleRead(int fd, void *data) -{ - Comm::IoCallback *ccb = (Comm::IoCallback *) data; - - assert(data == COMMIO_FD_READCB(fd)); - assert(ccb->active()); - /* Attempt a read */ - ++ statCounter.syscalls.sock.reads; - errno = 0; - int retval; - if (ccb->buf) { - retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); - debugs(5, 3, "char FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); - } else { - assert(ccb->buf2 != NULL); - SBuf::size_type sz = ccb->buf2->spaceSize(); - char *buf = ccb->buf2->rawSpace(sz); - retval = FD_READ_METHOD(fd, buf, sz-1); // blocking synchronous read(2) - if (retval > 0) { - ccb->buf2->append(buf, retval); - } - debugs(5, 3, "SBuf FD " << fd << ", size " << sz << ", retval " << retval << ", errno " << errno); - } - - if (retval < 0 && !ignoreErrno(errno)) { - debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); - ccb->offset = 0; - ccb->finish(COMM_ERROR, errno); - return; - }; - - /* See if we read anything */ - /* Note - read 0 == socket EOF, which is a valid read */ - if (retval >= 0) { - fd_bytes(fd, retval, FD_READ); - ccb->offset = retval; - ccb->finish(COMM_OK, errno); - return; - } - - /* Nope, register for some more IO */ - Comm::SetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0); -} - -/** - * Queue a read. handler/handler_data are called when the read - * completes, on error, or on file descriptor close. - */ -void -comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback) -{ - debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); - - /* Make sure we are open and not closing */ - assert(Comm::IsConnOpen(conn)); - assert(!fd_table[conn->fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); - - // Make sure we are either not reading or just passively monitoring. - // Active/passive conflicts are OK and simply cancel passive monitoring. - if (ccb->active()) { - // if the assertion below fails, we have an active comm_read conflict - assert(fd_table[conn->fd].halfClosedReader != NULL); - commStopHalfClosedMonitor(conn->fd); - assert(!ccb->active()); - } - ccb->conn = conn; - - /* Queue the read */ - ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); - Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); -} - -/** - * Queue a read. handler/handler_data are called when the read - * completes, on error, or on file descriptor close. - */ -void -comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback) -{ - debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); - - /* Make sure we are open and not closing */ - assert(Comm::IsConnOpen(conn)); - assert(!fd_table[conn->fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); - - // Make sure we are either not reading or just passively monitoring. - // Active/passive conflicts are OK and simply cancel passive monitoring. - if (ccb->active()) { - // if the assertion below fails, we have an active comm_read conflict - assert(fd_table[conn->fd].halfClosedReader != NULL); - commStopHalfClosedMonitor(conn->fd); - assert(!ccb->active()); - } - ccb->conn = conn; - ccb->buf2 = &buf; - - /* Queue the read */ - ccb->setCallback(Comm::IOCB_READ, callback, NULL, NULL, buf.spaceSize()); - Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); -} - -/** * Empty the read buffers * * This is a magical routine that empties the read buffers. * Under some platforms (Linux) if a buffer has data in it before * you call close(), the socket will hang and take quite a while * to timeout. */ static void comm_empty_os_read_buffers(int fd) { #if _SQUID_LINUX_ /* prevent those nasty RST packets */ char buf[SQUID_TCP_SO_RCVBUF]; if (fd_table[fd].flags.nonblocking) { while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0) {}; } #endif } /** - * Return whether the FD has a pending completed callback. - * NP: does not work. - */ -int -comm_has_pending_read_callback(int fd) -{ - assert(isOpen(fd)); - // XXX: We do not know whether there is a read callback scheduled. - // This is used for pconn management that should probably be more - // tightly integrated into comm to minimize the chance that a - // closing pconn socket will be used for a new transaction. - return false; -} - -// Does comm check this fd for read readiness? -// Note that when comm is not monitoring, there can be a pending callback -// call, which may resume comm monitoring once fired. -bool -comm_monitors_read(int fd) -{ - assert(isOpen(fd) && COMMIO_FD_READCB(fd)); - // Being active is usually the same as monitoring because we always - // start monitoring the FD when we configure Comm::IoCallback for I/O - // and we usually configure Comm::IoCallback for I/O when we starting - // monitoring a FD for reading. - return COMMIO_FD_READCB(fd)->active(); -} - -/** - * Cancel a pending read. Assert that we have the right parameters, - * and that there are no pending read events! - * - * XXX: We do not assert that there are no pending read events and - * with async calls it becomes even more difficult. - * The whole interface should be reworked to do callback->cancel() - * instead of searching for places where the callback may be stored and - * updating the state of those places. - * - * AHC Don't call the comm handlers? - */ -void -comm_read_cancel(int fd, IOCB *callback, void *data) -{ - if (!isOpen(fd)) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed"); - return; - } - - Comm::IoCallback *cb = COMMIO_FD_READCB(fd); - // TODO: is "active" == "monitors FD"? - if (!cb->active()) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); - return; - } - - typedef CommCbFunPtrCallT Call; - Call *call = dynamic_cast(cb->callback.getRaw()); - if (!call) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback"); - return; - } - - call->cancel("old comm_read_cancel"); - - typedef CommIoCbParams Params; - const Params ¶ms = GetCommParams(cb->callback); - - /* Ok, we can be reasonably sure we won't lose any data here! */ - assert(call->dialer.handler == callback); - assert(params.data == data); - - /* Delete the callback */ - cb->cancel("old comm_read_cancel"); - - /* And the IO event */ - Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); -} - -void -comm_read_cancel(int fd, AsyncCall::Pointer &callback) -{ - callback->cancel("comm_read_cancel"); - - if (!isOpen(fd)) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed"); - return; - } - - Comm::IoCallback *cb = COMMIO_FD_READCB(fd); - - if (!cb->active()) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); - return; - } - - AsyncCall::Pointer call = cb->callback; - assert(call != NULL); // XXX: should never fail (active() checks for callback==NULL) - - /* Ok, we can be reasonably sure we won't lose any data here! */ - assert(call == callback); - - /* Delete the callback */ - cb->cancel("comm_read_cancel"); - - /* And the IO event */ - Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); -} - -/** * synchronous wrapper around udp socket functions */ int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from) { ++ statCounter.syscalls.sock.recvfroms; debugs(5,8, "comm_udp_recvfrom: FD " << fd << " from " << from); struct addrinfo *AI = NULL; Ip::Address::InitAddrInfo(AI); int x = recvfrom(fd, buf, len, flags, AI->ai_addr, &AI->ai_addrlen); from = *AI; Ip::Address::FreeAddrInfo(AI); return x; } int comm_udp_recv(int fd, void *buf, size_t len, int flags) { Ip::Address nul; return comm_udp_recvfrom(fd, buf, len, flags, nul); @@ -1888,41 +1669,41 @@ CommIoCbPtrFun(&commHalfClosedReader, NULL)); comm_read(c, NULL, 0, call); fd_table[c->fd].halfClosedReader = call; } else c->fd = -1; // XXX: temporary. prevent c replacement erase closing listed FD } WillCheckHalfClosed = false; // as far as we know commPlanHalfClosedCheck(); // may need to check again } /// checks whether we are waiting for possibly half-closed connection to close // We are monitoring if the read handler for the fd is the monitoring handler. bool commHasHalfClosedMonitor(int fd) { return TheHalfClosed->has(fd); } /// stop waiting for possibly half-closed connection to close -static void +void commStopHalfClosedMonitor(int const fd) { debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed); // cancel the read if one was scheduled AsyncCall::Pointer reader = fd_table[fd].halfClosedReader; if (reader != NULL) comm_read_cancel(fd, reader); fd_table[fd].halfClosedReader = NULL; TheHalfClosed->del(fd); } /// I/O handler for the possibly half-closed connection monitoring code static void commHalfClosedReader(const Comm::ConnectionPointer &conn, char *, size_t size, comm_err_t flag, int, void *) { // there cannot be more data coming in on half-closed connections assert(size == 0); assert(conn != NULL); === modified file 'src/comm.h' --- src/comm.h 2014-03-15 02:30:08 +0000 +++ src/comm.h 2014-06-03 20:21:47 +0000 @@ -60,46 +60,40 @@ /// clear a timeout handler by FD number void commUnsetFdTimeout(int fd); /** * Set or clear the timeout for some action on an active connection. * API to replace commSetTimeout() when a Comm::ConnectionPointer is available. */ int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &callback); int commUnsetConnTimeout(const Comm::ConnectionPointer &conn); int ignoreErrno(int); void commCloseAllSockets(void); void checkTimeouts(void); //typedef void IOACB(int fd, int nfd, Comm::ConnectionPointer details, comm_err_t flag, int xerrno, void *data); void comm_add_close_handler(int fd, CLCB *, void *); void comm_add_close_handler(int fd, AsyncCall::Pointer &); void comm_remove_close_handler(int fd, CLCB *, void *); void comm_remove_close_handler(int fd, AsyncCall::Pointer &); -int comm_has_pending_read_callback(int fd); -bool comm_monitors_read(int fd); -void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback); -void comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback); -void comm_read_cancel(int fd, IOCB *callback, void *data); -void comm_read_cancel(int fd, AsyncCall::Pointer &callback); int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from); int comm_udp_recv(int fd, void *buf, size_t len, int flags); ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags); bool comm_has_incomplete_write(int); /** The read channel has closed and the caller does not expect more data * but needs to detect connection aborts. The current detection method uses * 0-length reads: We read until the error occurs or the writer closes * the connection. If there is a read error, we close the connection. */ void commStartHalfClosedMonitor(int fd); bool commHasHalfClosedMonitor(int fd); // XXX: remove these wrappers which minimize client_side.cc changes in a commit inline void commMarkHalfClosed(int fd) { commStartHalfClosedMonitor(fd); } inline bool commIsHalfClosed(int fd) { return commHasHalfClosedMonitor(fd); } /* A comm engine that calls comm_select */ class CommSelectEngine : public AsyncEngine { === modified file 'src/comm/IoCallback.cc' --- src/comm/IoCallback.cc 2014-03-15 02:30:08 +0000 +++ src/comm/IoCallback.cc 2014-06-03 14:45:34 +0000 @@ -72,64 +72,62 @@ #endif SetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0); } void Comm::IoCallback::cancel(const char *reason) { if (!active()) return; callback->cancel(reason); callback = NULL; reset(); } void Comm::IoCallback::reset() { conn = NULL; - buf2 = NULL; // we do not own this buffer. if (freefunc) { freefunc(buf); buf = NULL; freefunc = NULL; } xerrno = 0; #if USE_DELAY_POOLS quotaQueueReserv = 0; #endif } // Schedule the callback call and clear the callback void Comm::IoCallback::finish(comm_err_t code, int xerrn) { debugs(5, 3, HERE << "called for " << conn << " (" << code << ", " << xerrno << ")"); assert(active()); /* free data */ if (freefunc && buf) { freefunc(buf); buf = NULL; freefunc = NULL; } if (callback != NULL) { typedef CommIoCbParams Params; Params ¶ms = GetCommParams(callback); if (conn != NULL) params.fd = conn->fd; // for legacy write handlers... params.conn = conn; - params.buf2 = buf2; params.buf = buf; params.size = offset; params.flag = code; params.xerrno = xerrn; ScheduleCallHere(callback); callback = NULL; } /* Reset for next round. */ reset(); } === modified file 'src/comm/IoCallback.h' --- src/comm/IoCallback.h 2014-03-15 11:42:55 +0000 +++ src/comm/IoCallback.h 2014-06-03 14:44:45 +0000 @@ -8,48 +8,40 @@ class SBuf; namespace Comm { /// Type of IO callbacks the Comm layer deals with. typedef enum { IOCB_NONE, IOCB_READ, IOCB_WRITE } iocb_type; /// Details about a particular Comm IO callback event. class IoCallback { public: iocb_type type; Comm::ConnectionPointer conn; AsyncCall::Pointer callback; - - /// Buffer to store read(2) into when set. - // This is a pointer to the Jobs buffer rather than an SBuf using - // the same store since we cannot know when or how the Job will - // alter its SBuf while we are reading. - SBuf *buf2; - - // Legacy c-string buffers used when buf2 is unset. char *buf; FREE *freefunc; int size; int offset; comm_err_t errcode; int xerrno; #if USE_DELAY_POOLS unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue #endif bool active() const { return callback != NULL; } void setCallback(iocb_type type, AsyncCall::Pointer &cb, char *buf, FREE *func, int sz); /// called when fd needs to write but may need to wait in line for its quota void selectOrQueueWrite(); /// Actively cancel the given callback void cancel(const char *reason); /// finish the IO operation imediately and schedule the callback with the current state. === modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2012-03-29 09:22:41 +0000 +++ src/comm/Makefile.am 2014-06-03 19:57:54 +0000 @@ -4,27 +4,29 @@ noinst_LTLIBRARIES = libcomm.la ## Library holding comm socket handlers libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ ConnOpener.cc \ ConnOpener.h \ Connection.cc \ Connection.h \ forward.h \ IoCallback.cc \ IoCallback.h \ Loops.h \ ModDevPoll.cc \ ModEpoll.cc \ ModKqueue.cc \ ModPoll.cc \ ModSelect.cc \ ModSelectWin32.cc \ + Read.cc \ + Read.h \ TcpAcceptor.cc \ TcpAcceptor.h \ UdpOpenDialer.h \ Write.cc \ Write.h \ \ comm_internal.h === added file 'src/comm/Read.cc' --- src/comm/Read.cc 1970-01-01 00:00:00 +0000 +++ src/comm/Read.cc 2014-06-04 14:28:28 +0000 @@ -0,0 +1,233 @@ +/* + * DEBUG: section 05 Socket Functions + */ +#include "squid.h" +#include "comm.h" +#include "comm_internal.h" +#include "CommCalls.h" +#include "comm/IoCallback.h" +#include "comm/Loops.h" +#include "comm/Read.h" +#include "Debug.h" +#include "fd.h" +#include "fde.h" +#include "SBuf.h" +#include "StatCounters.h" +//#include "tools.h" + +// Does comm check this fd for read readiness? +// Note that when comm is not monitoring, there can be a pending callback +// call, which may resume comm monitoring once fired. +bool +Comm::MonitorsRead(int fd) +{ + assert(isOpen(fd) && COMMIO_FD_READCB(fd)); + // Being active is usually the same as monitoring because we always + // start monitoring the FD when we configure Comm::IoCallback for I/O + // and we usually configure Comm::IoCallback for I/O when we starting + // monitoring a FD for reading. + return COMMIO_FD_READCB(fd)->active(); +} + +void +Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) +{ + // TODO: move comm_read_base() internals into here + // when comm_read() char* API is no longer needed + comm_read_base(conn, NULL, 0, callback); +} + +/** + * Queue a read. + * If a buffer is given the callback is scheduled when the read + * completes, on error, or on file descriptor close. + * + * If no buffer (NULL) is given the callback is scheduled when + * the socket FD is ready for a read(2)/recv(2). + */ +void +comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback) +{ + debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); + + /* Make sure we are open and not closing */ + assert(Comm::IsConnOpen(conn)); + assert(!fd_table[conn->fd].closing()); + Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); + + // Make sure we are either not reading or just passively monitoring. + // Active/passive conflicts are OK and simply cancel passive monitoring. + if (ccb->active()) { + // if the assertion below fails, we have an active comm_read conflict + assert(fd_table[conn->fd].halfClosedReader != NULL); + commStopHalfClosedMonitor(conn->fd); + assert(!ccb->active()); + } + ccb->conn = conn; + + /* Queue the read */ + ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); + Comm::SetSelect(conn->fd, COMM_SELECT_READ, Comm::HandleRead, ccb, 0); +} + +comm_err_t +Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf) +{ + /* Attempt a read */ + ++ statCounter.syscalls.sock.reads; + SBuf::size_type sz = buf.spaceSize(); + char *b = buf.rawSpace(sz); + errno = 0; + int retval = FD_READ_METHOD(params.conn->fd, b, sz); + params.xerrno = errno; + + debugs(5, 3, "SBuf " << params.conn << ", size " << sz << ", retval " << retval << ", errno " << params.xerrno); + + if (retval > 0) { // data read most common case + buf.append(b, retval); + fd_bytes(params.conn->fd, retval, FD_READ); + params.flag = COMM_OK; + params.size = retval; + + } else if (retval == 0) { // remote closure (somewhat less) common + // Note - read 0 == socket EOF, which is a valid read. + params.flag = COMM_ERR_CLOSING; + + } else if (retval < 0) { // connection errors are worst-case + debugs(5, 3, params.conn << " COMM_ERROR: " << xstrerror()); + if (ignoreErrno(params.xerrno)) + params.flag = COMM_INPROGRESS; + else + params.flag = COMM_ERROR; + } + + return params.flag; +} + +/** + * Handle an FD which is ready for read(2). + * + * If there is no provided buffer to fill call the callback. + * + * Otherwise attempt a read into the provided buffer. + * If the read attempt succeeds or fails, call the callback. + * Else, wait for another IO notification. + */ +void +Comm::HandleRead(int fd, void *data) +{ + Comm::IoCallback *ccb = (Comm::IoCallback *) data; + + assert(data == COMMIO_FD_READCB(fd)); + assert(ccb->active()); + + // if the caller did not supply a buffer, just schedule callback + if (!ccb->buf) { + ccb->finish(COMM_OK, 0); + return; + } + + /* For legacy callers : Attempt a read */ + ++ statCounter.syscalls.sock.reads; + errno = 0; + int retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); + debugs(5, 3, "char FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); + + if (retval < 0 && !ignoreErrno(errno)) { + debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); + ccb->offset = 0; + ccb->finish(COMM_ERROR, errno); + return; + }; + + /* See if we read anything */ + /* Note - read 0 == socket EOF, which is a valid read */ + if (retval >= 0) { + fd_bytes(fd, retval, FD_READ); + ccb->offset = retval; + ccb->finish(COMM_OK, errno); + return; + } + + /* Nope, register for some more IO */ + Comm::SetSelect(fd, COMM_SELECT_READ, Comm::HandleRead, data, 0); +} + +/** + * Cancel a pending read. Assert that we have the right parameters, + * and that there are no pending read events! + * + * XXX: We do not assert that there are no pending read events and + * with async calls it becomes even more difficult. + * The whole interface should be reworked to do callback->cancel() + * instead of searching for places where the callback may be stored and + * updating the state of those places. + * + * AHC Don't call the comm handlers? + */ +void +comm_read_cancel(int fd, IOCB *callback, void *data) +{ + if (!isOpen(fd)) { + debugs(5, 4, "fails: FD " << fd << " closed"); + return; + } + + Comm::IoCallback *cb = COMMIO_FD_READCB(fd); + // TODO: is "active" == "monitors FD"? + if (!cb->active()) { + debugs(5, 4, "fails: FD " << fd << " inactive"); + return; + } + + typedef CommCbFunPtrCallT Call; + Call *call = dynamic_cast(cb->callback.getRaw()); + if (!call) { + debugs(5, 4, "fails: FD " << fd << " lacks callback"); + return; + } + + call->cancel("old comm_read_cancel"); + + typedef CommIoCbParams Params; + const Params ¶ms = GetCommParams(cb->callback); + + /* Ok, we can be reasonably sure we won't lose any data here! */ + assert(call->dialer.handler == callback); + assert(params.data == data); + + /* Delete the callback */ + cb->cancel("old comm_read_cancel"); + + /* And the IO event */ + Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +void +Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) +{ + callback->cancel("comm_read_cancel"); + + if (!isOpen(fd)) { + debugs(5, 4, "fails: FD " << fd << " closed"); + return; + } + + Comm::IoCallback *cb = COMMIO_FD_READCB(fd); + + if (!cb->active()) { + debugs(5, 4, "fails: FD " << fd << " inactive"); + return; + } + + AsyncCall::Pointer call = cb->callback; + + /* Ok, we can be reasonably sure we won't lose any data here! */ + assert(call == callback); + + /* Delete the callback */ + cb->cancel("comm_read_cancel"); + + /* And the IO event */ + Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} === added file 'src/comm/Read.h' --- src/comm/Read.h 1970-01-01 00:00:00 +0000 +++ src/comm/Read.h 2014-06-03 22:16:17 +0000 @@ -0,0 +1,55 @@ +#ifndef _SQUID_COMM_READ_H +#define _SQUID_COMM_READ_H + +#include "base/AsyncCall.h" +#include "CommCalls.h" +#include "comm/forward.h" + +class SBuf; + +namespace Comm +{ + +/** + * Start monitoring for read. + * + * callback is scheduled when the read is possible, + * or on file descriptor close. + */ +void Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback); + +/// whether the FD socket is being monitored for read +bool MonitorsRead(int fd); + +/** + * Perform a read(2) on a connection immediately. + * + * The returned flag is also placed in params.flag. + * + * \retval COMM_OK data has been read and placed in buf, amount in params.size + * \retval COMM_ERROR an error occured, the code is placed in params.xerrno + * \retval COMM_INPROGRESS unable to read at this time, or a minor error occured + * \retval COMM_ERR_CLOSING 0-byte read has occured. + * Usually indicates the remote end has disconnected. + */ +comm_err_t ReadNow(CommIoCbParams ¶ms, SBuf &buf); + +/// Cancel the read pending on FD. No action if none pending. +void ReadCancel(int fd, AsyncCall::Pointer &callback); + +/// callback handler to process an FD which is available for reading +extern PF HandleRead; + +} // namespace Comm + +// Legacy API to be removed +void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback); +inline void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) +{ + assert(buf != NULL); + comm_read_base(conn, buf, len, callback); +} +void comm_read_cancel(int fd, IOCB *callback, void *data); +inline void comm_read_cancel(int fd, AsyncCall::Pointer &callback) {Comm::ReadCancel(fd,callback);} + +#endif /* _SQUID_COMM_READ_H */ === modified file 'src/comm/comm_internal.h' --- src/comm/comm_internal.h 2012-09-21 14:57:30 +0000 +++ src/comm/comm_internal.h 2014-06-03 20:41:03 +0000 @@ -1,16 +1,17 @@ #ifndef SQUID_COMM_COMM_INTERNAL_H #define SQUID_COMM_COMM_INTERNAL_H /* misc collection of bits shared by Comm code, but not needed by the rest of Squid. */ struct _fd_debug_t { char const *close_file; int close_line; }; typedef struct _fd_debug_t fd_debug_t; extern fd_debug_t *fdd_table; bool isOpen(const int fd); +void commStopHalfClosedMonitor(int fd); #endif === modified file 'src/dns_internal.cc' --- src/dns_internal.cc 2014-06-02 07:19:35 +0000 +++ src/dns_internal.cc 2014-06-03 21:13:48 +0000 @@ -19,40 +19,41 @@ * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "base/InstanceId.h" #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/Write.h" #include "dlink.h" #include "event.h" #include "fd.h" #include "fde.h" #include "ip/tools.h" #include "Mem.h" #include "MemBuf.h" #include "mgr/Registration.h" #include "rfc3596.h" #include "SquidConfig.h" #include "SquidTime.h" #include "Store.h" #include "tools.h" #include "util.h" #include "wordlist.h" #if SQUID_SNMP #include "snmp_core.h" #endif === modified file 'src/fde.cc' --- src/fde.cc 2014-04-30 10:50:09 +0000 +++ src/fde.cc 2014-06-03 20:22:16 +0000 @@ -15,53 +15,53 @@ * incorporates software developed and/or copyrighted by other * sources; see the CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" -#include "comm.h" +#include "comm/Read.h" #include "fde.h" #include "globals.h" #include "SquidTime.h" #include "Store.h" fde *fde::Table = NULL; bool fde::readPending(int fdNumber) { if (type == FD_SOCKET) - return comm_monitors_read(fdNumber); + return Comm::MonitorsRead(fdNumber); return read_handler ? true : false ; } void fde::dumpStats (StoreEntry &dumpEntry, int fdNumber) { if (!flags.open) return; #if _SQUID_WINDOWS_ storeAppendPrintf(&dumpEntry, "%4d 0x%-8lX %-6.6s %4d %7" PRId64 "%c %7" PRId64 "%c %-21s %s\n", fdNumber, win32.handle, #else storeAppendPrintf(&dumpEntry, "%4d %-6.6s %4d %7" PRId64 "%c %7" PRId64 "%c %-21s %s\n", fdNumber, #endif fdTypeStr[type], === modified file 'src/ftp.cc' --- src/ftp.cc 2014-06-02 07:19:35 +0000 +++ src/ftp.cc 2014-06-03 21:14:42 +0000 @@ -17,40 +17,41 @@ * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "acl/FilledChecklist.h" #include "comm.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommCalls.h" #include "compat/strtoll.h" #include "errorpage.h" #include "fd.h" #include "fde.h" #include "FwdState.h" #include "html_quote.h" #include "HttpHdrContRange.h" #include "HttpHeader.h" #include "HttpHeaderRange.h" #include "HttpReply.h" #include "HttpRequest.h" #include "ip/tools.h" #include "Mem.h" #include "MemBuf.h" #include "mime.h" #include "rfc1738.h" #include "Server.h" === modified file 'src/gopher.cc' --- src/gopher.cc 2014-03-31 06:57:27 +0000 +++ src/gopher.cc 2014-06-03 21:15:30 +0000 @@ -14,40 +14,41 @@ * incorporates software developed and/or copyrighted by other * sources; see the CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. */ #include "squid.h" #include "comm.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "fd.h" #include "FwdState.h" #include "globals.h" #include "html_quote.h" #include "HttpReply.h" #include "HttpRequest.h" #include "Mem.h" #include "MemBuf.h" #include "mime.h" #include "rfc1738.h" #include "SquidConfig.h" #include "SquidTime.h" #include "StatCounters.h" #include "Store.h" #include "tools.h" #if USE_DELAY_POOLS #include "DelayPools.h" === modified file 'src/helper.cc' --- src/helper.cc 2014-03-04 10:33:08 +0000 +++ src/helper.cc 2014-06-03 21:17:27 +0000 @@ -17,40 +17,41 @@ * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "base/AsyncCbdataCalls.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "comm/Write.h" #include "fd.h" #include "fde.h" #include "format/Quoting.h" #include "helper.h" #include "Mem.h" #include "MemBuf.h" #include "SquidIpc.h" #include "SquidMath.h" #include "SquidTime.h" #include "Store.h" #include "wordlist.h" #define HELPER_MAX_ARGS 64 /** Initial Squid input buffer size. Helper responses may exceed this, and * Squid will grow the input buffer as needed, up to ReadBufMaxSize. */ const size_t ReadBufMinSize(4*1024); === modified file 'src/ident/Ident.cc' --- src/ident/Ident.cc 2013-10-25 00:13:46 +0000 +++ src/ident/Ident.cc 2014-06-03 20:43:34 +0000 @@ -18,40 +18,41 @@ * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #if USE_IDENT #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/Write.h" #include "CommCalls.h" #include "globals.h" #include "ident/Config.h" #include "ident/Ident.h" #include "MemBuf.h" namespace Ident { #define IDENT_PORT 113 #define IDENT_KEY_SZ 50 #define IDENT_BUFSIZE 4096 typedef struct _IdentClient { IDCB *callback; void *callback_data; struct _IdentClient *next; } IdentClient; === modified file 'src/ipc/Port.cc' --- src/ipc/Port.cc 2014-01-27 05:27:41 +0000 +++ src/ipc/Port.cc 2014-06-03 20:44:44 +0000 @@ -1,28 +1,29 @@ /* * DEBUG: section 54 Interprocess Communication * */ #include "squid.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "CommCalls.h" #include "globals.h" #include "ipc/Port.h" static const char channelPathPfx[] = DEFAULT_STATEDIR "/"; static const char coordinatorAddrLabel[] = "-coordinator"; const char Ipc::strandAddrLabel[] = "-kid"; Ipc::Port::Port(const String& aListenAddr): UdsOp(aListenAddr) { setOptions(COMM_NONBLOCKING | COMM_DOBIND); } void Ipc::Port::start() { UdsOp::start(); doListen(); } === modified file 'src/pconn.cc' --- src/pconn.cc 2014-04-30 10:50:09 +0000 +++ src/pconn.cc 2014-06-03 21:17:51 +0000 @@ -17,40 +17,41 @@ * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "CachePeer.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "fd.h" #include "fde.h" #include "globals.h" #include "mgr/Registration.h" #include "neighbors.h" #include "pconn.h" #include "PeerPoolMgr.h" #include "SquidConfig.h" #include "Store.h" #define PCONN_FDS_SZ 8 /* pconn set size, increase for better memcache hit rate */ //TODO: re-attach to MemPools. WAS: static MemAllocator *pconn_fds_pool = NULL; PconnModule * PconnModule::instance = NULL; CBDATA_CLASS_INIT(IdleConnList); /* ========== IdleConnList ============================================ */ IdleConnList::IdleConnList(const char *key, PconnPool *thePool) : capacity_(PCONN_FDS_SZ), === modified file 'src/store.cc' --- src/store.cc 2014-02-21 10:46:19 +0000 +++ src/store.cc 2014-06-03 21:17:38 +0000 @@ -18,40 +18,41 @@ * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "CacheDigest.h" #include "CacheManager.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "ETag.h" #include "event.h" #include "fde.h" #include "globals.h" #include "http.h" #include "HttpReply.h" #include "HttpRequest.h" #include "mem_node.h" #include "MemObject.h" #include "mgr/Registration.h" #include "mgr/StoreIoAction.h" #include "profiler/Profiler.h" #include "repl_modules.h" #include "RequestFlags.h" #include "SquidConfig.h" #include "SquidTime.h" #include "StatCounters.h" #include "stmem.h" #include "Store.h" #include "store_digest.h" === modified file 'src/tests/stub_client_side.cc' --- src/tests/stub_client_side.cc 2014-03-30 12:00:34 +0000 +++ src/tests/stub_client_side.cc 2014-06-03 18:22:27 +0000 @@ -34,41 +34,41 @@ void ConnStateData::notifyAllContexts(const int xerrno) STUB bool ConnStateData::clientParseRequests() STUB_RETVAL(false) void ConnStateData::readNextRequest() STUB void ConnStateData::addContextToQueue(ClientSocketContext * context) STUB int ConnStateData::getConcurrentRequestCount() const STUB_RETVAL(0) bool ConnStateData::isOpen() const STUB_RETVAL(false) void ConnStateData::checkHeaderLimits() STUB void ConnStateData::sendControlMsg(HttpControlMsg msg) STUB int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0) #if USE_AUTH void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB #endif bool ConnStateData::transparent() const STUB_RETVAL(false) bool ConnStateData::reading() const STUB_RETVAL(false) void ConnStateData::stopReading() STUB void ConnStateData::stopReceiving(const char *error) STUB void ConnStateData::stopSending(const char *error) STUB void ConnStateData::expectNoForwarding() STUB void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB -bool ConnStateData::handleReadData(SBuf *buf) STUB_RETVAL(false) +bool ConnStateData::handleReadData() STUB_RETVAL(false) bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false) void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB void ConnStateData::unpinConnection() STUB const Comm::ConnectionPointer ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *peer) STUB_RETVAL(NULL) void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) STUB void ConnStateData::clientReadRequest(const CommIoCbParams &io) STUB void ConnStateData::connStateClosed(const CommCloseCbParams &io) STUB void ConnStateData::requestTimeout(const CommTimeoutCbParams ¶ms) STUB void ConnStateData::swanSong() STUB void ConnStateData::quitAfterError(HttpRequest *request) STUB #if USE_OPENSSL void ConnStateData::httpsPeeked(Comm::ConnectionPointer serverConnection) STUB void ConnStateData::getSslContextStart() STUB void ConnStateData::getSslContextDone(SSL_CTX * sslContext, bool isNew) STUB void ConnStateData::sslCrtdHandleReplyWrapper(void *data, const HelperReply &reply) STUB void ConnStateData::sslCrtdHandleReply(const HelperReply &reply) STUB void ConnStateData::switchToHttps(HttpRequest *request, Ssl::BumpMode bumpServerMode) STUB void ConnStateData::buildSslCertGenerationParams(Ssl::CertificateProperties &certProperties) STUB bool ConnStateData::serveDelayedError(ClientSocketContext *context) STUB_RETVAL(false) #endif === modified file 'src/tests/stub_libcomm.cc' --- src/tests/stub_libcomm.cc 2013-03-26 10:38:20 +0000 +++ src/tests/stub_libcomm.cc 2014-06-04 10:54:39 +0000 @@ -31,32 +31,42 @@ #include "comm/forward.h" bool Comm::IsConnOpen(const Comm::ConnectionPointer &) STUB_RETVAL(false) #include "comm/IoCallback.h" void Comm::IoCallback::setCallback(iocb_type, AsyncCall::Pointer &, char *, FREE *, int) STUB void Comm::IoCallback::selectOrQueueWrite() STUB void Comm::IoCallback::cancel(const char *reason) STUB void Comm::IoCallback::finish(comm_err_t code, int xerrn) STUB Comm::CbEntry *Comm::iocb_table = NULL; void Comm::CallbackTableInit() STUB void Comm::CallbackTableDestruct() STUB #include "comm/Loops.h" void Comm::SelectLoopInit(void) STUB void Comm::SetSelect(int, unsigned int, PF *, void *, time_t) STUB void Comm::ResetSelect(int) STUB comm_err_t Comm::DoSelect(int) STUB_RETVAL(COMM_ERROR) void Comm::QuickPollRequired(void) STUB +#include "comm/Read.h" +void Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) STUB +bool Comm::MonitorsRead(int fd) STUB_RETVAL(false) +comm_err_t Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf) STUB_RETVAL(COMM_ERROR) +void Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) STUB +//void Comm::HandleRead(int, void*) STUB + +void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) STUB +void comm_read_cancel(int fd, IOCB *callback, void *data) STUB + #include "comm/TcpAcceptor.h" //Comm::TcpAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub) STUB void Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) STUB void Comm::TcpAcceptor::unsubscribe(const char *) STUB void Comm::TcpAcceptor::acceptNext() STUB void Comm::TcpAcceptor::notify(const comm_err_t flag, const Comm::ConnectionPointer &) const STUB #include "comm/Write.h" void Comm::Write(const Comm::ConnectionPointer &, const char *, int, AsyncCall::Pointer &, FREE *) STUB void Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback) STUB void Comm::WriteCancel(const Comm::ConnectionPointer &conn, const char *reason) STUB /*PF*/ void Comm::HandleWrite(int, void*) STUB === modified file 'src/tunnel.cc' --- src/tunnel.cc 2014-06-03 08:52:29 +0000 +++ src/tunnel.cc 2014-06-03 21:17:33 +0000 @@ -23,40 +23,41 @@ * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "acl/FilledChecklist.h" #include "base/CbcPointer.h" #include "CachePeer.h" #include "client_side.h" #include "client_side_request.h" #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "fde.h" #include "FwdState.h" #include "http.h" #include "HttpRequest.h" #include "HttpStateFlags.h" #include "ip/QosConfig.h" #include "LogTags.h" #include "MemBuf.h" #include "PeerSelectState.h" #include "SquidConfig.h" #include "StatCounters.h" #if USE_OPENSSL #include "ssl/PeerConnector.h" #endif #include "tools.h" #if USE_DELAY_POOLS #include "DelayId.h" #endif === modified file 'src/whois.cc' --- src/whois.cc 2014-06-02 07:19:35 +0000 +++ src/whois.cc 2014-06-03 21:17:21 +0000 @@ -16,40 +16,41 @@ * sources; see the CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "comm.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "FwdState.h" #include "HttpReply.h" #include "HttpRequest.h" #include "SquidConfig.h" #include "StatCounters.h" #include "Store.h" #include "tools.h" #include #define WHOIS_PORT 43 class WhoisState { public: void readReply(const Comm::ConnectionPointer &, char *aBuffer, size_t aBufferLength, comm_err_t flag, int xerrno); void setReplyToOK(StoreEntry *sentry); StoreEntry *entry;