=== modified file 'src/Makefile.am' --- src/Makefile.am 2010-11-06 14:58:44 +0000 +++ src/Makefile.am 2010-11-09 05:09:38 +0000 @@ -553,7 +553,7 @@ squid_LDADD = \ $(COMMON_LIBS) \ - comm/libcomm-listener.la \ + comm/libcomm.la \ eui/libeui.la \ icmp/libicmp.la icmp/libicmp-core.la \ log/liblog.la \ @@ -1276,10 +1276,10 @@ wordlist.cc nodist_tests_testCacheManager_SOURCES = \ $(BUILT_SOURCES) -# comm.cc only requires comm/libcomm-listener.la until fdc_table is dead. +# comm.cc only requires comm/libcomm.la until fdc_table is dead. tests_testCacheManager_LDADD = \ $(COMMON_LIBS) \ - comm/libcomm-listener.la \ + comm/libcomm.la \ icmp/libicmp.la icmp/libicmp-core.la \ log/liblog.la \ $(REPL_OBJS) \ @@ -1483,7 +1483,7 @@ tests_testEvent_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -1646,7 +1646,7 @@ tests_testEventLoop_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -1804,7 +1804,7 @@ tests_test_http_range_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -1967,7 +1967,7 @@ tests_testHttpRequest_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -2382,7 +2382,7 @@ tests_testURL_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REGEXLIB) \ $(REPL_OBJS) \ === modified file 'src/MemBuf.cc' --- src/MemBuf.cc 2009-12-26 00:25:57 +0000 +++ src/MemBuf.cc 2010-11-08 11:01:00 +0000 @@ -41,7 +41,7 @@ * Rationale: * ---------- * - * Here is how one would comm_write an object without MemBuffer: + * Here is how one would Comm::Io::Write an object without MemBuffer: * * { * -- allocate: @@ -53,7 +53,7 @@ * ... * * -- write - * comm_write(buf, free, ...); + * Comm::Io::Write(buf, free, ...); * } * * The whole "packing" idea is quite messy: We are given a buffer of fixed @@ -91,7 +91,7 @@ * ... * * -- write - * comm_write_mbuf(fd, buf, handler, data); + * Comm::Io::Write(fd, buf, callback); * * -- *iff* you did not give the buffer away, free it yourself * -- buf.clean(); === modified file 'src/Packer.cc' --- src/Packer.cc 2009-02-08 00:02:47 +0000 +++ src/Packer.cc 2010-11-08 11:01:00 +0000 @@ -45,7 +45,7 @@ * Comm.c lacks commAppend[Printf] because comm does not handle its own * buffers (no mem_obj equivalent for comm.c). * - * Thus, if one wants to be able to store _and_ comm_write an object, s/he + * Thus, if one wants to be able to store _and_ Comm::Io::Write an object, s/he * has to implement two almost identical functions. * * Packer @@ -55,10 +55,10 @@ * Packer has its own append and printf routines that "know" where to send * incoming data. In case of store interface, Packer sends data to * storeAppend. Otherwise, Packer uses a MemBuf that can be flushed later to - * comm_write. + * Comm::Io::Write. * * Thus, one can write just one function that will either "pack" things for - * comm_write or "append" things to store, depending on actual packer + * Comm::Io::Write or "append" things to store, depending on actual packer * supplied. * * It is amazing how much work a tiny object can save. :) === modified file 'src/Server.cc' --- src/Server.cc 2010-10-22 00:12:11 +0000 +++ src/Server.cc 2010-11-08 11:01:00 +0000 @@ -34,6 +34,7 @@ #include "squid.h" #include "base/TextException.h" +#include "comm/IoWrite.h" #include "Server.h" #include "Store.h" #include "fde.h" /* for fd_table[fd].closing */ @@ -425,7 +426,7 @@ typedef CommCbMemFunT Dialer; requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody); - comm_write_mbuf(fd, &buf, requestSender); + Comm::Io::Write(fd, &buf, requestSender); } else { debugs(9,3, HERE << "will wait for more request body bytes or eof"); requestSender = NULL; === modified file 'src/Server.h' --- src/Server.h 2010-09-10 20:06:41 +0000 +++ src/Server.h 2010-11-08 11:01:00 +0000 @@ -185,7 +185,7 @@ protected: BodyPipe::Pointer requestBodySource; /**< to consume request body */ - AsyncCall::Pointer requestSender; /**< set if we are expecting comm_write to call us back */ + AsyncCall::Pointer requestSender; /**< set if we are expecting Comm::Io::Write to call us back */ #if USE_ADAPTATION BodyPipe::Pointer virginBodyDestination; /**< to provide virgin response body */ === modified file 'src/adaptation/icap/Xaction.cc' --- src/adaptation/icap/Xaction.cc 2010-10-21 08:13:41 +0000 +++ src/adaptation/icap/Xaction.cc 2010-11-08 11:01:00 +0000 @@ -4,6 +4,7 @@ #include "squid.h" #include "comm.h" +#include "comm/IoWrite.h" #include "CommCalls.h" #include "HttpMsg.h" #include "adaptation/icap/Xaction.h" @@ -237,7 +238,7 @@ writer = JobCallback(93,3, Dialer, this, Adaptation::Icap::Xaction::noteCommWrote); - comm_write_mbuf(connection, &buf, writer); + Comm::Io::Write(connection, &buf, writer); updateTimeout(); } === modified file 'src/client_side.cc' --- src/client_side.cc 2010-10-26 00:17:17 +0000 +++ src/client_side.cc 2010-11-08 11:01:00 +0000 @@ -92,6 +92,7 @@ #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" +#include "comm/IoWrite.h" #include "comm/ListenStateData.h" #include "base/TextException.h" #include "ConnectionDetail.h" @@ -362,7 +363,7 @@ AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg", CommIoCbPtrFun(&WroteControlMsg, this)); - comm_write_mbuf(fd(), mb, call); + Comm::Io::Write(fd(), mb, call); delete mb; } @@ -948,7 +949,7 @@ noteSentBodyBytes (length); AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete", CommIoCbPtrFun(clientWriteBodyComplete, this)); - comm_write(fd(), bodyData.data, length, call ); + Comm::Io::Write(fd(), bodyData.data, length, call, NULL); return; } @@ -963,7 +964,7 @@ /* write */ AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); - comm_write_mbuf(fd(), &mb, call); + Comm::Io::Write(fd(), &mb, call); } else writeComplete(fd(), NULL, 0, COMM_OK); } @@ -1366,7 +1367,7 @@ debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete"); AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); - comm_write_mbuf(fd(), mb, call); + Comm::Io::Write(fd(), mb, call); delete mb; } === modified file 'src/client_side_request.cc' --- src/client_side_request.cc 2010-10-21 08:13:41 +0000 +++ src/client_side_request.cc 2010-11-08 11:01:00 +0000 @@ -59,6 +59,7 @@ #include "client_side_reply.h" #include "client_side_request.h" #include "ClientRequestContext.h" +#include "comm/IoWrite.h" #include "compat/inet_pton.h" #include "fde.h" #include "HttpReply.h" @@ -1204,8 +1205,9 @@ // TODO: Unify with tunnel.cc and add a Server(?) header static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n"; - comm_write(fd, conn_established, strlen(conn_established), - &SslBumpEstablish, this, NULL); + AsyncCall::Pointer call = commCbCall(85, 5, "ClientSocketContext::sslBumpEstablish", + CommIoCbPtrFun(&SslBumpEstablish, this)); + Comm::Io::Write(fd, conn_established, strlen(conn_established), call, NULL); } #endif === modified file 'src/comm.cc' --- src/comm.cc 2010-11-03 16:28:34 +0000 +++ src/comm.cc 2010-11-08 11:01:00 +0000 @@ -39,6 +39,8 @@ #include "fde.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" +#include "comm/IoCallback.h" +#include "comm/IoWrite.h" #include "comm/ListenStateData.h" #include "CommIO.h" #include "CommRead.h" @@ -67,12 +69,6 @@ * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything. */ -typedef enum { - IOCB_NONE, - IOCB_READ, - IOCB_WRITE -} iocb_type; - static void commStopHalfClosedMonitor(int fd); static IOCB commHalfClosedReader; static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI); @@ -84,142 +80,6 @@ static void commHandleWriteHelper(void * data); #endif -static void commSelectOrQueueWrite(const int fd); - -struct comm_io_callback_t { - iocb_type type; - int fd; - AsyncCall::Pointer callback; - char *buf; - FREE *freefunc; - int size; - int offset; - comm_err_t errcode; - int xerrno; -#if DELAY_POOLS - unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue -#endif - - - bool active() const { return callback != NULL; } -}; - -struct _comm_fd { - int fd; - comm_io_callback_t readcb; - comm_io_callback_t writecb; -}; -typedef struct _comm_fd comm_fd_t; -comm_fd_t *commfd_table; - -// TODO: make this a comm_io_callback_t method? -bool -commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb) -{ - assert(ccb->fd == fd); - assert(ccb->type == type); - return ccb->active(); -} - -/* - * Configure comm_io_callback_t for I/O - * - * @param fd filedescriptor - * @param ccb comm io callback - * @param cb callback - * @param cbdata callback data (must be cbdata'ed) - * @param buf buffer, if applicable - * @param freefunc freefunc, if applicable - * @param size buffer size - */ -static void -commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, - AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size) -{ - assert(!ccb->active()); - assert(ccb->type == type); - assert(cb != NULL); - ccb->fd = fd; - ccb->callback = cb; - ccb->buf = buf; - ccb->freefunc = freefunc; - ccb->size = size; - ccb->offset = 0; -} - - -// Schedule the callback call and clear the callback -static void -commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno) -{ - debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" << - code << ", " << xerrno << ")"); - assert(ccb->active()); - assert(ccb->fd == fd); - ccb->errcode = code; - ccb->xerrno = xerrno; - -#if DELAY_POOLS - ccb->quotaQueueReserv = 0; -#endif - - comm_io_callback_t cb = *ccb; - - /* We've got a copy; blow away the real one */ - /* XXX duplicate code from commio_cancel_callback! */ - ccb->xerrno = 0; - ccb->callback = NULL; // cb has it - - /* free data */ - if (cb.freefunc) { - cb.freefunc(cb.buf); - cb.buf = NULL; - } - - if (cb.callback != NULL) { - typedef CommIoCbParams Params; - Params ¶ms = GetCommParams(cb.callback); - params.fd = cb.fd; - params.buf = cb.buf; - params.size = cb.offset; - params.flag = cb.errcode; - params.xerrno = cb.xerrno; - ScheduleCallHere(cb.callback); - } -} - - -/* - * Cancel the given callback - * - * Remember that the data is cbdataRef'ed. - */ -// TODO: make this a comm_io_callback_t method -static void -commio_cancel_callback(int fd, comm_io_callback_t *ccb) -{ - debugs(5, 3, "commio_cancel_callback: called for FD " << fd); - assert(ccb->fd == fd); - assert(ccb->active()); - - ccb->xerrno = 0; - ccb->callback = NULL; - -#if DELAY_POOLS - ccb->quotaQueueReserv = 0; -#endif -} - -/* - * Call the given comm callback; assumes the callback is valid. - * - * @param ccb io completion callback - */ -void -commio_call_callback(comm_io_callback_t *ccb) -{ -} - class ConnectStateData { @@ -267,7 +127,6 @@ #endif static void commSetTcpRcvbuf(int, int); static PF commConnectFree; -static PF commHandleWrite; static IPH commConnectDnsHandle; typedef enum { @@ -293,10 +152,10 @@ void commHandleRead(int fd, void *data) { - comm_io_callback_t *ccb = (comm_io_callback_t *) data; + Comm::Io::Callback *ccb = (Comm::Io::Callback *) data; assert(data == COMMIO_FD_READCB(fd)); - assert(commio_has_callback(fd, IOCB_READ, ccb)); + assert(ccb->active()); /* Attempt a read */ statCounter.syscalls.sock.reads++; errno = 0; @@ -307,7 +166,7 @@ if (retval < 0 && !ignoreErrno(errno)) { debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); ccb->offset = 0; - commio_finish_callback(fd, ccb, COMM_ERROR, errno); + ccb->finish(COMM_ERROR, errno); return; }; @@ -316,7 +175,7 @@ if (retval >= 0) { fd_bytes(fd, retval, FD_READ); ccb->offset = retval; - commio_finish_callback(fd, ccb, COMM_OK, errno); + ccb->finish(COMM_OK, errno); return; } @@ -344,7 +203,7 @@ /* Make sure we are open and not closing */ assert(isOpen(fd)); assert(!fd_table[fd].closing()); - comm_io_callback_t *ccb = COMMIO_FD_READCB(fd); + Comm::Io::Callback *ccb = COMMIO_FD_READCB(fd); // Make sure we are either not reading or just passively monitoring. // Active/passive conflicts are OK and simply cancel passive monitoring. @@ -356,7 +215,7 @@ } /* Queue the read */ - commio_set_callback(fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size); + ccb->setCallback(Comm::Io::IOCB_READ, callback, (char *)buf, NULL, size); commSetSelect(fd, COMM_SELECT_READ, commHandleRead, ccb, 0); } @@ -404,9 +263,9 @@ { assert(isOpen(fd)); // Being active is usually the same as monitoring because we always - // start monitoring the FD when we configure comm_io_callback_t for I/O - // and we usually configure comm_io_callback_t for I/O when we starting - // monitoring a FD for reading. TODO: replace with commio_has_callback + // start monitoring the FD when we configure Comm::Io::Callback for I/O + // and we usually configure Comm::Io::Callback for I/O when we starting + // monitoring a FD for reading. return COMMIO_FD_READCB(fd)->active(); } @@ -430,7 +289,7 @@ return; } - comm_io_callback_t *cb = COMMIO_FD_READCB(fd); + Comm::Io::Callback *cb = COMMIO_FD_READCB(fd); // TODO: is "active" == "monitors FD"? if (!cb->active()) { debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); @@ -454,7 +313,7 @@ assert(params.data == data); /* Delete the callback */ - commio_cancel_callback(fd, cb); + cb->cancel("old comm_read_cancel"); /* And the IO event */ commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); @@ -470,7 +329,7 @@ return; } - comm_io_callback_t *cb = COMMIO_FD_READCB(fd); + Comm::Io::Callback *cb = COMMIO_FD_READCB(fd); if (!cb->active()) { debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); @@ -484,7 +343,7 @@ assert(call == callback); /* Delete the callback */ - commio_cancel_callback(fd, cb); + cb->cancel("comm_read_cancel"); /* And the IO event */ commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); @@ -1601,13 +1460,13 @@ commSetTimeout(fd, -1, NULL, NULL); // notify read/write handlers after canceling select reservations, if any - if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) { + if (COMMIO_FD_WRITECB(fd)->active()) { commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0); - commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno); + COMMIO_FD_WRITECB(fd)->finish(COMM_ERR_CLOSING, errno); } - if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) { + if (COMMIO_FD_READCB(fd)->active()) { commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); - commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno); + COMMIO_FD_READCB(fd)->finish(COMM_ERR_CLOSING, errno); } #if DELAY_POOLS @@ -1935,14 +1794,8 @@ /* make sure the accept() socket FIFO delay queue exists */ Comm::AcceptLimiter::Instance(); - commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t)); - for (int pos = 0; pos < Squid_MaxFD; pos++) { - commfd_table[pos].fd = pos; - commfd_table[pos].readcb.fd = pos; - commfd_table[pos].readcb.type = IOCB_READ; - commfd_table[pos].writecb.fd = pos; - commfd_table[pos].writecb.type = IOCB_WRITE; - } + // make sure the IO pending callback table exists + Comm::Io::CallbackTableInit(); /* XXX account fd_table */ /* Keep a few file descriptors free so that we don't run out of FD's @@ -1963,12 +1816,12 @@ safe_free(fd_table); safe_free(fdd_table); - safe_free(commfd_table); + Comm::Io::CallbackTableDestruct(); } #if DELAY_POOLS // called when the queue is done waiting for the client bucket to fill -static void +void commHandleWriteHelper(void * data) { CommQuotaQueue *queue = static_cast(data); @@ -1987,14 +1840,14 @@ do { // check that the head descriptor is still relevant const int head = clientInfo->quotaPeekFd(); - comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head); + Callback *ccb = COMMIO_FD_WRITECB(head); if (fd_table[head].clientInfo == clientInfo && clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv && !fd_table[head].closing()) { // wait for the head descriptor to become ready for writing - commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0); + commSetSelect(head, COMM_SELECT_WRITE, Comm::Io::HandleWrite, ccb, 0); clientInfo->selectWaiting = true; return; } @@ -2191,190 +2044,7 @@ fds.pop_front(); ++outs; } - -#endif - -/* Write to FD. */ -static void -commHandleWrite(int fd, void *data) -{ - comm_io_callback_t *state = (comm_io_callback_t *)data; - int len = 0; - int nleft; - - assert(state == COMMIO_FD_WRITECB(fd)); - - PROF_start(commHandleWrite); - debugs(5, 5, "commHandleWrite: FD " << fd << ": off " << - (long int) state->offset << ", sz " << (long int) state->size << "."); - - nleft = state->size - state->offset; - -#if DELAY_POOLS - ClientInfo * clientInfo=fd_table[fd].clientInfo; - - if (clientInfo && !clientInfo->writeLimitingActive) - clientInfo = NULL; // we only care about quota limits here - - if (clientInfo) { - assert(clientInfo->selectWaiting); - clientInfo->selectWaiting = false; - - assert(clientInfo->hasQueue()); - assert(clientInfo->quotaPeekFd() == fd); - clientInfo->quotaDequeue(); // we will write or requeue below - - if (nleft > 0) { - const int quota = clientInfo->quotaForDequed(); - if (!quota) { // if no write quota left, queue this fd - state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); - clientInfo->kickQuotaQueue(); - PROF_stop(commHandleWrite); - return; - } - - const int nleft_corrected = min(nleft, quota); - if (nleft != nleft_corrected) { - debugs(5, 5, HERE << "FD " << fd << " writes only " << - nleft_corrected << " out of " << nleft); - nleft = nleft_corrected; - } - - } - } - -#endif - - /* actually WRITE data */ - len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); - debugs(5, 5, "commHandleWrite: write() returns " << len); - -#if DELAY_POOLS - if (clientInfo) { - if (len > 0) { - /* we wrote data - drain them from bucket */ - clientInfo->bucketSize -= len; - if (clientInfo->bucketSize < 0.0) { - debugs(5,1, HERE << "drained too much"); // should not happen - clientInfo->bucketSize = 0; - } - } - - // even if we wrote nothing, we were served; give others a chance - clientInfo->kickQuotaQueue(); - } -#endif - - fd_bytes(fd, len, FD_WRITE); - statCounter.syscalls.sock.writes++; - // After each successful partial write, - // reset fde::writeStart to the current time. - fd_table[fd].writeStart = squid_curtime; - - if (len == 0) { - /* Note we even call write if nleft == 0 */ - /* We're done */ - - if (nleft != 0) - debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining."); - - commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno); - } else if (len < 0) { - /* An error */ - - if (fd_table[fd].flags.socket_eof) { - debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << "."); - commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno); - } else if (ignoreErrno(errno)) { - debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << "."); - commSelectOrQueueWrite(fd); - } else { - debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << "."); - commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno); - } - } else { - /* A successful write, continue */ - state->offset += len; - - if (state->offset < state->size) { - /* Not done, reinstall the write handler and write some more */ - commSelectOrQueueWrite(fd); - } else { - commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno); - } - } - - PROF_stop(commHandleWrite); -} - -/* - * Queue a write. handler/handler_data are called when the write - * completes, on error, or on file descriptor close. - * - * free_func is used to free the passed buffer when the write has completed. - */ -void -comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func) -{ - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander", - CommIoCbPtrFun(handler, handler_data)); - - comm_write(fd, buf, size, call, free_func); -} - -void -comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) -{ - debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback); - - /* Make sure we are open, not closing, and not writing */ - assert(isOpen(fd)); - assert(!fd_table[fd].closing()); - comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd); - assert(!ccb->active()); - - fd_table[fd].writeStart = squid_curtime; - /* Queue the write */ - commio_set_callback(fd, IOCB_WRITE, ccb, callback, - (char *)buf, free_func, size); - - commSelectOrQueueWrite(fd); -} - -// called when fd needs to write but may need to wait in line for its quota -static void -commSelectOrQueueWrite(const int fd) -{ - comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd); - -#if DELAY_POOLS - // stand in line if there is one - if (ClientInfo *clientInfo = fd_table[fd].clientInfo) { - if (clientInfo->writeLimitingActive) { - ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd); - clientInfo->kickQuotaQueue(); - return; - } - } -#endif - - commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0); -} - - -/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */ -void -comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data) -{ - comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc()); -} - -void -comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback) -{ - comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc()); -} - +#endif /* * hm, this might be too general-purpose for all the places we'd @@ -2458,7 +2128,7 @@ static bool writeTimedOut(int fd) { - if (!commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) + if (!COMMIO_FD_WRITECB(fd)->active()) return false; if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write) @@ -2481,7 +2151,7 @@ // We have an active write callback and we are timed out debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout"); commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0); - commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERROR, ETIMEDOUT); + COMMIO_FD_WRITECB(fd)->finish(COMM_ERROR, ETIMEDOUT); } else if (AlreadyTimedOut(F)) continue; === modified file 'src/comm.h' --- src/comm.h 2010-10-06 03:50:45 +0000 +++ src/comm.h 2010-11-08 11:01:00 +0000 @@ -4,28 +4,12 @@ #include "squid.h" #include "AsyncEngine.h" #include "base/AsyncCall.h" +#include "comm/comm_err_t.h" +#include "comm/IoCallback.h" #include "StoreIOBuffer.h" #include "Array.h" #include "ip/Address.h" -#define COMMIO_FD_READCB(fd) (&commfd_table[(fd)].readcb) -#define COMMIO_FD_WRITECB(fd) (&commfd_table[(fd)].writecb) - -typedef enum { - COMM_OK = 0, - COMM_ERROR = -1, - COMM_NOMESSAGE = -3, - COMM_TIMEOUT = -4, - COMM_SHUTDOWN = -5, - COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */ - COMM_INPROGRESS = -7, - COMM_ERR_CONNECT = -8, - COMM_ERR_DNS = -9, - COMM_ERR_CLOSING = -10, - COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */ - COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */ -} comm_err_t; - class DnsLookupDetails; typedef void CNCB(int fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data); @@ -81,10 +65,6 @@ SQUIDCEXTERN void commResetSelect(int); SQUIDCEXTERN int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen); -extern void comm_write(int fd, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func); -extern void comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL); -SQUIDCEXTERN void comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data); -extern void comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback); SQUIDCEXTERN void commCallCloseHandlers(int fd); SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *); extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback); === added file 'src/comm/IoCallback.cc' --- src/comm/IoCallback.cc 1970-01-01 00:00:00 +0000 +++ src/comm/IoCallback.cc 2010-10-26 06:54:14 +0000 @@ -0,0 +1,123 @@ +#include "config.h" +#include "comm/IoCallback.h" +#include "comm/IoWrite.h" +#include "CommCalls.h" + +Comm::Io::CbEntry *Comm::Io::iocb_table; + +void +Comm::Io::CallbackTableInit() +{ + // XXX: convert this to a std::map<> ? + iocb_table = static_cast(xcalloc(Squid_MaxFD, sizeof(CbEntry))); + for (int pos = 0; pos < Squid_MaxFD; pos++) { + iocb_table[pos].fd = pos; + iocb_table[pos].readcb.fd = pos; + iocb_table[pos].readcb.type = IOCB_READ; + iocb_table[pos].writecb.fd = pos; + iocb_table[pos].writecb.type = IOCB_WRITE; + } +} + +void +Comm::Io::CallbackTableDestruct() +{ + safe_free(iocb_table); +} + +/** + * Configure Comm::Io::Callback for I/O + * + * @param fd filedescriptor + * @param t IO callback type (read or write) + * @param cb callback + * @param buf buffer, if applicable + * @param func freefunc, if applicable + * @param sz buffer size + */ +void +Comm::Io::Callback::setCallback(Comm::Io::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz) +{ + assert(!active()); + assert(type == t); + assert(cb != NULL); + + callback = cb; + buf = b; + freefunc = f; + size = sz; + offset = 0; +} + +void +Comm::Io::Callback::selectOrQueueWrite() +{ +#if DELAY_POOLS + // stand in line if there is one + if (ClientInfo *clientInfo = fd_table[fd].clientInfo) { + if (clientInfo->writeLimitingActive) { + ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd); + clientInfo->kickQuotaQueue(); + return; + } + } +#endif + + commSetSelect(fd, COMM_SELECT_WRITE, Comm::Io::HandleWrite, this, 0); +} + +void +Comm::Io::Callback::cancel(const char *reason) +{ + if (!active()) + return; + + callback->cancel(reason); + callback = NULL; + reset(); +} + +void +Comm::Io::Callback::reset() +{ + if (freefunc) { + freefunc(buf); + buf = NULL; + freefunc = NULL; + } + xerrno = 0; + +#if DELAY_POOLS + ccb->quotaQueueReserv = 0; +#endif +} + +// Schedule the callback call and clear the callback +void +Comm::Io::Callback::finish(comm_err_t code, int xerrn) +{ + debugs(5, 3, HERE << "called for FD " << fd << " (" << code << ", " << xerrno << ")"); + assert(active()); + + /* free data */ + if (freefunc) { + freefunc(buf); + buf = NULL; + freefunc = NULL; + } + + if (callback != NULL) { + typedef CommIoCbParams Params; + Params ¶ms = GetCommParams(callback); + params.fd = fd; + params.buf = buf; + params.size = offset; + params.flag = code; + params.xerrno = xerrn; + ScheduleCallHere(callback); + callback = NULL; + } + + /* Reset for next round. */ + reset(); +} === added file 'src/comm/IoCallback.h' --- src/comm/IoCallback.h 1970-01-01 00:00:00 +0000 +++ src/comm/IoCallback.h 2010-10-26 09:27:40 +0000 @@ -0,0 +1,73 @@ +#ifndef _SQUID_COMM_IOCALLBACK_H +#define _SQUID_COMM_IOCALLBACK_H + +#include "config.h" +#include "base/AsyncCall.h" +#include "comm_err_t.h" + +namespace Comm { + +namespace Io { + +/// Type of IO callbacks the Comm layer deals with. +typedef enum { + IOCB_NONE, + IOCB_READ, + IOCB_WRITE +} iocb_type; + +/// Details about a particular Comm IO callback event. +class Callback { +public: + iocb_type type; + int fd; + AsyncCall::Pointer callback; + char *buf; + FREE *freefunc; + int size; + int offset; + comm_err_t errcode; + int xerrno; +#if DELAY_POOLS + unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue +#endif + + bool active() const { return callback != NULL; } + void setCallback(iocb_type type, AsyncCall::Pointer &cb, char *buf, FREE *func, int sz); + + /// called when fd needs to write but may need to wait in line for its quota + void selectOrQueueWrite(); + + /// Actively cancel the given callback + void cancel(const char *reason); + + /// finish the IO operation imediately and schedule the callback with the current state. + void finish(comm_err_t code, int xerrn); + +private: + void reset(); +}; + +/// Entry nodes for the IO callback table: iocb_table +/// Keyed off the FD which the event applies to. +class CbEntry { +public: + int fd; + Callback readcb; + Callback writecb; +}; + +/// Table of scheduled IO events which have yet to be processed ?? +/// Callbacks which might be scheduled in future are stored in fd_table. +extern CbEntry *iocb_table; + +extern void CallbackTableInit(); +extern void CallbackTableDestruct(); + +#define COMMIO_FD_READCB(fd) (&Comm::Io::iocb_table[(fd)].readcb) +#define COMMIO_FD_WRITECB(fd) (&Comm::Io::iocb_table[(fd)].writecb) + +}; // namespace Io +}; // namespace Comm + +#endif /* _SQUID_COMM_IOCALLBACK_H */ === added file 'src/comm/IoWrite.cc' --- src/comm/IoWrite.cc 1970-01-01 00:00:00 +0000 +++ src/comm/IoWrite.cc 2010-10-26 09:28:32 +0000 @@ -0,0 +1,146 @@ +#include "config.h" +#if DELAY_POOLS +#include "ClientInfo.h" +#endif +#include "comm/IoCallback.h" +#include "comm/IoWrite.h" +#include "fde.h" +#include "SquidTime.h" +#include "MemBuf.h" + +void +Comm::Io::Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback) +{ + Comm::Io::Write(fd, mb->buf, mb->size, callback, mb->freeFunc()); +} + +void +Comm::Io::Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) +{ + debugs(5, 5, HERE << "FD " << fd << ": sz " << size << ": asynCall " << callback); + + /* Make sure we are open, not closing, and not writing */ + assert(fd_table[fd].flags.open); + assert(!fd_table[fd].closing()); + Comm::Io::Callback *ccb = COMMIO_FD_WRITECB(fd); + assert(!ccb->active()); + + fd_table[fd].writeStart = squid_curtime; + /* Queue the write */ + ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size); + ccb->selectOrQueueWrite(); +} + +/** Write to FD. + * This function is used by the lowest level of IO loop which only has access to FD numbers. + * We have to use the comm iocb_table to map FD numbers to waiting data. + * Once the write has been concluded we schedule the waiting call with success/fail results. + */ +void +Comm::Io::HandleWrite(int fd, void *data) +{ + Comm::Io::Callback *state = static_cast(data); + int len = 0; + int nleft; + + assert(state->fd == fd); + + PROF_start(commHandleWrite); + debugs(5, 5, HERE << "FD " << state->fd << ": off " << + (long int) state->offset << ", sz " << (long int) state->size << "."); + + nleft = state->size - state->offset; + +#if DELAY_POOLS + ClientInfo * clientInfo=fd_table[fd].clientInfo; + + if (clientInfo && !clientInfo->writeLimitingActive) + clientInfo = NULL; // we only care about quota limits here + + if (clientInfo) { + assert(clientInfo->selectWaiting); + clientInfo->selectWaiting = false; + + assert(clientInfo->hasQueue()); + assert(clientInfo->quotaPeekFd() == fd); + clientInfo->quotaDequeue(); // we will write or requeue below + + if (nleft > 0) { + const int quota = clientInfo->quotaForDequed(); + if (!quota) { // if no write quota left, queue this fd + state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); + clientInfo->kickQuotaQueue(); + PROF_stop(commHandleWrite); + return; + } + + const int nleft_corrected = min(nleft, quota); + if (nleft != nleft_corrected) { + debugs(5, 5, HERE << "FD " << fd << " writes only " << + nleft_corrected << " out of " << nleft); + nleft = nleft_corrected; + } + + } + } +#endif + + /* actually WRITE data */ + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); + debugs(5, 5, HERE << "write() returns " << len); + +#if DELAY_POOLS + if (clientInfo) { + if (len > 0) { + /* we wrote data - drain them from bucket */ + clientInfo->bucketSize -= len; + if (clientInfo->bucketSize < 0.0) { + debugs(5,1, HERE << "drained too much"); // should not happen + clientInfo->bucketSize = 0; + } + } + + // even if we wrote nothing, we were served; give others a chance + clientInfo->kickQuotaQueue(); + } +#endif + + fd_bytes(fd, len, FD_WRITE); + statCounter.syscalls.sock.writes++; + // After each successful partial write, + // reset fde::writeStart to the current time. + fd_table[fd].writeStart = squid_curtime; + + if (len == 0) { + /* Note we even call write if nleft == 0 */ + /* We're done */ + if (nleft != 0) + debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining."); + + state->finish(nleft ? COMM_ERROR : COMM_OK, errno); + } else if (len < 0) { + /* An error */ + if (fd_table[fd].flags.socket_eof) { + debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); + state->finish(nleft ? COMM_ERROR : COMM_OK, errno); + } else if (ignoreErrno(errno)) { + debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); + state->selectOrQueueWrite(); + } else { + debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); + state->finish(nleft ? COMM_ERROR : COMM_OK, errno); + } + } else { + /* A successful write, continue */ + state->offset += len; + + if (state->offset < state->size) { + /* Not done, reinstall the write handler and write some more */ + state->selectOrQueueWrite(); + } else { + state->finish(nleft ? COMM_OK : COMM_ERROR, errno); + } + } + + PROF_stop(commHandleWrite); +} === added file 'src/comm/IoWrite.h' --- src/comm/IoWrite.h 1970-01-01 00:00:00 +0000 +++ src/comm/IoWrite.h 2010-10-26 09:26:49 +0000 @@ -0,0 +1,32 @@ +#ifndef _SQUID_COMM_IOWRITE_H +#define _SQUID_COMM_IOWRITE_H + +#include "base/AsyncCall.h" + +namespace Comm { +namespace Io { + +/** + * Queue a write. callback is scheduled when the write + * completes, on error, or on file descriptor close. + * + * free_func is used to free the passed buffer when the write has completed. + */ +void Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func); + +/** + * Queue a write. callback is scheduled when the write + * completes, on error, or on file descriptor close. + */ +void Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback); + +/// Cancel the write pending on FD. No action if none pending. +void WriteCancel(int fd, const char *reason); + +// callback handler to process an FD which is available for writing. +extern PF HandleWrite; + +}; // namespace Io +}; // namespace Comm + +#endif /* _SQUID_COMM_IOWRITE_H */ === modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2009-12-31 02:35:01 +0000 +++ src/comm/Makefile.am 2010-11-09 05:08:40 +0000 @@ -1,13 +1,19 @@ include $(top_srcdir)/src/Common.am include $(top_srcdir)/src/TestHeaders.am -noinst_LTLIBRARIES = libcomm-listener.la +noinst_LTLIBRARIES = libcomm.la -## Library holding listener comm socket handlers -libcomm_listener_la_SOURCES= \ +## Library holding comm socket handlers +libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ ListenStateData.cc \ ListenStateData.h \ \ + IoCallback.cc \ + IoCallback.h \ + IoWrite.cc \ + IoWrite.h \ + \ + comm_err_t.h \ comm_internal.h === added file 'src/comm/comm_err_t.h' --- src/comm/comm_err_t.h 1970-01-01 00:00:00 +0000 +++ src/comm/comm_err_t.h 2010-10-10 03:55:43 +0000 @@ -0,0 +1,21 @@ +#ifndef _SQUID_COMM_COMM_ERR_T_H +#define _SQUID_COMM_COMM_ERR_T_H + +#include "config.h" + +typedef enum { + COMM_OK = 0, + COMM_ERROR = -1, + COMM_NOMESSAGE = -3, + COMM_TIMEOUT = -4, + COMM_SHUTDOWN = -5, + COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */ + COMM_INPROGRESS = -7, + COMM_ERR_CONNECT = -8, + COMM_ERR_DNS = -9, + COMM_ERR_CLOSING = -10, + COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */ + COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */ +} comm_err_t; + +#endif /* _SQUID_COMM_COMM_ERR_T_H */ === modified file 'src/dns_internal.cc' --- src/dns_internal.cc 2010-11-04 00:12:17 +0000 +++ src/dns_internal.cc 2010-11-08 11:01:00 +0000 @@ -39,6 +39,7 @@ #include "SquidTime.h" #include "Store.h" #include "comm.h" +#include "comm/IoWrite.h" #include "fde.h" #include "ip/tools.h" #include "MemBuf.h" @@ -767,7 +768,10 @@ commSetTimeout(vc->fd, Config.Timeout.idns_query, NULL, NULL); - comm_write_mbuf(vc->fd, mb, idnsSentQueryVC, vc); + AsyncCall::Pointer call = commCbCall(78, 5, "idnsSentQueryVC", + CommIoCbPtrFun(&idnsSentQueryVC, vc)); + + Comm::Io::Write(vc->fd, mb, call); delete mb; } === modified file 'src/errorpage.cc' --- src/errorpage.cc 2010-11-01 05:44:28 +0000 +++ src/errorpage.cc 2010-11-08 11:01:00 +0000 @@ -32,7 +32,7 @@ * */ #include "config.h" - +#include "comm/IoWrite.h" #include "errorpage.h" #include "auth/UserRequest.h" #include "SquidTime.h" @@ -461,7 +461,9 @@ rep = err->BuildHttpReply(); MemBuf *mb = rep->pack(); - comm_write_mbuf(fd, mb, errorSendComplete, err); + AsyncCall::Pointer call = commCbCall(78, 5, "errorSendComplete", + CommIoCbPtrFun(&errorSendComplete, err)); + Comm::Io::Write(fd, mb, call); delete mb; delete rep; === modified file 'src/ftp.cc' --- src/ftp.cc 2010-11-01 05:44:28 +0000 +++ src/ftp.cc 2010-11-08 11:01:00 +0000 @@ -34,6 +34,7 @@ #include "squid.h" #include "comm.h" +#include "comm/IoWrite.h" #include "comm/ListenStateData.h" #include "compat/strtoll.h" #include "ConnectionDetail.h" @@ -1531,10 +1532,7 @@ typedef CommCbMemFunT Dialer; AsyncCall::Pointer call = JobCallback(9, 5, Dialer, this, FtpStateData::ftpWriteCommandCallback); - comm_write(ctrl.fd, - ctrl.last_command, - strlen(ctrl.last_command), - call); + Comm::Io::Write(ctrl.fd, ctrl.last_command, strlen(ctrl.last_command), call, NULL); scheduleReadControlReply(0); } === modified file 'src/gopher.cc' --- src/gopher.cc 2010-11-01 05:44:28 +0000 +++ src/gopher.cc 2010-11-08 11:01:00 +0000 @@ -34,6 +34,7 @@ */ #include "squid.h" +#include "comm/IoWrite.h" #include "errorpage.h" #include "Store.h" #include "html_quote.h" @@ -985,7 +986,9 @@ } debugs(10, 5, "gopherSendRequest: FD " << fd); - comm_write(fd, buf, strlen(buf), gopherSendComplete, gopherState, NULL); + AsyncCall::Pointer call = commCbCall(5,5, "gopherSendComplete", + CommIoCbPtrFun(gopherSendComplete, gopherState)); + Comm::Io::Write(fd, buf, strlen(buf), call, NULL); if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE)) gopherState->entry->setPublicKey(); /* Make it public */ === modified file 'src/helper.cc' --- src/helper.cc 2010-09-20 19:27:24 +0000 +++ src/helper.cc 2010-11-08 11:01:00 +0000 @@ -33,6 +33,7 @@ */ #include "squid.h" +#include "comm/IoWrite.h" #include "helper.h" #include "SquidMath.h" #include "SquidTime.h" @@ -1179,11 +1180,9 @@ srv->writebuf = srv->wqueue; srv->wqueue = new MemBuf; srv->flags.writing = 1; - comm_write(srv->wfd, - srv->writebuf->content(), - srv->writebuf->contentSize(), - helperDispatchWriteDone, /* Handler */ - srv, NULL); /* Handler-data, freefunc */ + AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone", + CommIoCbPtrFun(helperDispatchWriteDone, srv)); + Comm::Io::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL); } } @@ -1225,11 +1224,9 @@ srv->writebuf = srv->wqueue; srv->wqueue = new MemBuf; srv->flags.writing = 1; - comm_write(srv->wfd, - srv->writebuf->content(), - srv->writebuf->contentSize(), - helperDispatchWriteDone, /* Handler */ - srv, NULL); /* Handler-data, free func */ + AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone", + CommIoCbPtrFun(helperDispatchWriteDone, srv)); + Comm::Io::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL); } debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes"); @@ -1280,11 +1277,9 @@ srv->flags.reserved = 1; srv->request = r; srv->dispatch_time = current_time; - comm_write(srv->wfd, - r->buf, - strlen(r->buf), - helperStatefulDispatchWriteDone, /* Handler */ - hlp, NULL); /* Handler-data, free func */ + AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone", + CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp)); + Comm::Io::Write(srv->wfd, r->buf, strlen(r->buf), call, NULL); debugs(84, 5, "helperStatefulDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << (int) strlen(r->buf) << " bytes"); === modified file 'src/http.cc' --- src/http.cc 2010-11-01 05:44:28 +0000 +++ src/http.cc 2010-11-08 11:01:00 +0000 @@ -45,11 +45,11 @@ #include "base/AsyncJobCalls.h" #include "base/TextException.h" #include "base64.h" +#include "comm/IoWrite.h" #if DELAY_POOLS #include "DelayPools.h" #endif #include "errorpage.h" -#include "fde.h" #include "http.h" #include "HttpControlMsg.h" #include "HttpHdrContRange.h" @@ -2139,7 +2139,7 @@ request->peer_host=_peer?_peer->host:NULL; buildRequestPrefix(request, orig_request, entry, &mb); debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf); - comm_write_mbuf(fd, &mb, requestSender); + Comm::Io::Write(fd, &mb, requestSender); return true; } @@ -2226,7 +2226,7 @@ typedef CommCbMemFunT Dialer; requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast); - comm_write(fd, "\r\n", 2, requestSender); + Comm::Io::Write(fd, "\r\n", 2, requestSender, NULL); return true; #else return false; @@ -2248,7 +2248,7 @@ typedef CommCbMemFunT Dialer; requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast); - comm_write(fd, "0\r\n\r\n", 5, requestSender); + Comm::Io::Write(fd, "0\r\n\r\n", 5, requestSender, NULL); return true; } === modified file 'src/ident/Ident.cc' --- src/ident/Ident.cc 2010-04-17 02:29:04 +0000 +++ src/ident/Ident.cc 2010-11-08 11:01:00 +0000 @@ -37,6 +37,7 @@ #if USE_IDENT #include "comm.h" +#include "comm/IoWrite.h" #include "ident/Config.h" #include "ident/Ident.h" #include "MemBuf.h" @@ -149,7 +150,9 @@ mb.Printf("%d, %d\r\n", state->my_peer.GetPort(), state->me.GetPort()); - comm_write_mbuf(fd, &mb, NULL, state); + + AsyncCall::Pointer nil; + Comm::Io::Write(fd, &mb, nil); comm_read(fd, state->buf, BUFSIZ, Ident::ReadReply, state); commSetTimeout(fd, Ident::TheConfig.timeout, Ident::Timeout, state); } === modified file 'src/ipc/UdsOp.cc' --- src/ipc/UdsOp.cc 2010-08-24 00:12:54 +0000 +++ src/ipc/UdsOp.cc 2010-11-08 11:01:00 +0000 @@ -9,6 +9,7 @@ #include "config.h" #include "comm.h" #include "CommCalls.h" +#include "comm/IoWrite.h" #include "base/TextException.h" #include "ipc/UdsOp.h" @@ -106,7 +107,7 @@ typedef CommCbMemFunT Dialer; AsyncCall::Pointer writeHandler = JobCallback(54, 5, Dialer, this, UdsSender::wrote); - comm_write(fd(), message.raw(), message.size(), writeHandler); + Comm::Io::Write(fd(), message.raw(), message.size(), writeHandler, NULL); writing = true; } === modified file 'src/mgr/Inquirer.cc' --- src/mgr/Inquirer.cc 2010-10-29 00:12:28 +0000 +++ src/mgr/Inquirer.cc 2010-11-08 11:01:00 +0000 @@ -7,6 +7,7 @@ #include "config.h" #include "base/TextException.h" +#include "comm/IoWrite.h" #include "CommCalls.h" #include "HttpReply.h" #include "ipc/Coordinator.h" @@ -90,7 +91,7 @@ std::auto_ptr replyBuf(reply->pack()); writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader", CommCbMemFunT(this, &Inquirer::noteWroteHeader)); - comm_write_mbuf(fd, replyBuf.get(), writer); + Comm::Io::Write(fd, replyBuf.get(), writer); } /// called when we wrote the response header === modified file 'src/mgr/StoreToCommWriter.cc' --- src/mgr/StoreToCommWriter.cc 2010-10-29 00:12:28 +0000 +++ src/mgr/StoreToCommWriter.cc 2010-11-08 11:01:00 +0000 @@ -8,6 +8,7 @@ #include "config.h" #include "base/TextException.h" #include "CommCalls.h" +#include "comm/IoWrite.h" #include "ipc/FdNotes.h" #include "mgr/StoreToCommWriter.h" #include "StoreClient.h" @@ -108,7 +109,7 @@ AsyncCall::Pointer writer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote", MyDialer(this, &StoreToCommWriter::noteCommWrote)); - comm_write(fd, ioBuf.data, ioBuf.length, writer); + Comm::Io::Write(fd, ioBuf.data, ioBuf.length, writer, NULL); } void === modified file 'src/tunnel.cc' --- src/tunnel.cc 2010-10-06 03:50:45 +0000 +++ src/tunnel.cc 2010-11-08 11:01:00 +0000 @@ -38,6 +38,7 @@ #include "HttpRequest.h" #include "fde.h" #include "comm.h" +#include "comm/IoWrite.h" #include "client_side_request.h" #include "acl/FilledChecklist.h" #if DELAY_POOLS @@ -321,8 +322,11 @@ if (from.len == 0 && !fd_closed(to.fd()) ) { comm_close(to.fd()); } - } else if (cbdataReferenceValid(this)) - comm_write(to.fd(), from.buf, len, completion, this, NULL); + } else if (cbdataReferenceValid(this)) { + AsyncCall::Pointer call = commCbCall(5,5, "SomeTunnelWriteHandler", + CommIoCbPtrFun(completion, this)); + Comm::Io::Write(to.fd(), from.buf, len, call, NULL); + } cbdataInternalUnlock(this); /* ??? */ } @@ -531,8 +535,9 @@ TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << tunnelState); *tunnelState->status_ptr = HTTP_OK; - comm_write(tunnelState->client.fd(), conn_established, strlen(conn_established), - tunnelConnectedWriteDone, tunnelState, NULL); + AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone", + CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState)); + Comm::Io::Write(tunnelState->client.fd(), conn_established, strlen(conn_established), call, NULL); } static void @@ -742,7 +747,10 @@ packerClean(&p); mb.append("\r\n", 2); - comm_write_mbuf(tunnelState->server.fd(), &mb, tunnelProxyConnectedWriteDone, tunnelState); + AsyncCall::Pointer call = commCbCall(5,5, "tunnelProxyConnectedWriteDone", + CommIoCbPtrFun(tunnelProxyConnectedWriteDone, tunnelState)); + + Comm::Io::Write(tunnelState->server.fd(), &mb, call); commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState); } === modified file 'src/whois.cc' --- src/whois.cc 2010-02-06 06:32:11 +0000 +++ src/whois.cc 2010-11-08 11:01:01 +0000 @@ -34,6 +34,7 @@ */ #include "squid.h" +#include "comm/IoWrite.h" #include "errorpage.h" #include "Store.h" #include "HttpReply.h" @@ -101,7 +102,10 @@ String str_print=p->request->urlpath.substr(1,p->request->urlpath.size()); snprintf(buf, l, SQUIDSTRINGPH"\r\n", SQUIDSTRINGPRINT(str_print)); - comm_write(fd, buf, strlen(buf), whoisWriteComplete, p, NULL); + AsyncCall::Pointer call = commCbCall(5,5, "whoisWriteComplete", + CommIoCbPtrFun(whoisWriteComplete, p)); + + Comm::Io::Write(fd, buf, strlen(buf), call, NULL); comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p); commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p); }