Fixed stalled concurrent rock store reads by insuring their ID uniqueness. Added a check to prevent similar bugs from occurring in the future. === modified file 'src/DiskIO/IpcIo/IpcIoFile.cc' --- src/DiskIO/IpcIo/IpcIoFile.cc 2014-01-27 05:27:41 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.cc 2014-02-21 05:18:15 +0000 @@ -290,85 +290,90 @@ IpcIoFile::writeCompleted(WriteRequest * (writeRequest->free_func)(const_cast(writeRequest->buf)); // broken API? if (!ioError) { debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << diskId << " at " << writeRequest->offset); } const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len; const int errflag = ioError ? DISK_ERROR :DISK_OK; ioRequestor->writeCompleted(errflag, rlen, writeRequest); } bool IpcIoFile::ioInProgress() const { return !olderRequests->empty() || !newerRequests->empty(); } /// track a new pending request void -IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) +IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending) { - newerRequests->insert(std::make_pair(lastRequestId, pending)); + const std::pair result = + newerRequests->insert(std::make_pair(id, pending)); + Must(result.second); // failures means that id was not unique if (!timeoutCheckScheduled) scheduleTimeoutCheck(); } /// push an I/O request to disker void IpcIoFile::push(IpcIoPendingRequest *const pending) { // prevent queue overflows: check for responses to earlier requests + // warning: this call may result in indirect push() recursion HandleResponses("before push"); debugs(47, 7, HERE); Must(diskId >= 0); Must(pending); Must(pending->readRequest || pending->writeRequest); IpcIoMsg ipcIo; try { + if (++lastRequestId == 0) // don't use zero value as requestId + ++lastRequestId; ipcIo.requestId = lastRequestId; ipcIo.start = current_time; if (pending->readRequest) { ipcIo.command = IpcIo::cmdRead; ipcIo.offset = pending->readRequest->offset; ipcIo.len = pending->readRequest->len; } else { // pending->writeRequest Must(pending->writeRequest->len <= Ipc::Mem::PageSize()); if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { ipcIo.len = 0; throw TexcHere("run out of shared memory pages for IPC I/O"); } ipcIo.command = IpcIo::cmdWrite; ipcIo.offset = pending->writeRequest->offset; ipcIo.len = pending->writeRequest->len; char *const buf = Ipc::Mem::PagePointer(ipcIo.page); memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away } debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId)); if (queue->push(diskId, ipcIo)) Notify(diskId); // must notify disker - trackPendingRequest(pending); + trackPendingRequest(ipcIo.requestId, pending); } catch (const Queue::Full &) { debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len // TODO: grow queue size pending->completeIo(NULL); delete pending; } catch (const TextException &e) { debugs(47, DBG_IMPORTANT, HERE << e.what()); pending->completeIo(NULL); delete pending; } } /// whether we think there is enough time to complete the I/O bool IpcIoFile::canWait() const { if (!config.ioTimeout) return true; // no timeout specified @@ -592,43 +597,40 @@ IpcIoFile::getFD() const } /* IpcIoMsg */ IpcIoMsg::IpcIoMsg(): requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0) { start.tv_sec = 0; start.tv_usec = 0; } /* IpcIoPendingRequest */ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): file(aFile), readRequest(NULL), writeRequest(NULL) { - Must(file != NULL); - if (++file->lastRequestId == 0) // don't use zero value as requestId - ++file->lastRequestId; } void IpcIoPendingRequest::completeIo(IpcIoMsg *const response) { if (readRequest) file->readCompleted(readRequest, response); else if (writeRequest) file->writeCompleted(writeRequest, response); else { Must(!response); // only timeouts are handled here file->openCompleted(NULL); } } /* XXX: disker code that should probably be moved elsewhere */ static int TheFile = -1; ///< db file descriptor static void === modified file 'src/DiskIO/IpcIo/IpcIoFile.h' --- src/DiskIO/IpcIo/IpcIoFile.h 2013-10-25 00:13:46 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.h 2014-02-21 05:17:46 +0000 @@ -68,41 +68,41 @@ public: virtual bool canRead() const; virtual bool canWrite() const; virtual bool ioInProgress() const; /// handle open response from coordinator static void HandleOpenResponse(const Ipc::StrandSearchResponse &response); /// handle queue push notifications from worker or disker static void HandleNotification(const Ipc::TypedMsgHdr &msg); DiskFile::Config config; ///< supported configuration options protected: friend class IpcIoPendingRequest; void openCompleted(const Ipc::StrandSearchResponse *const response); void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response); void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response); bool canWait() const; private: - void trackPendingRequest(IpcIoPendingRequest *const pending); + void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending); void push(IpcIoPendingRequest *const pending); IpcIoPendingRequest *dequeueRequest(const unsigned int requestId); static void Notify(const int peerId); static void OpenTimeout(void *const param); static void CheckTimeouts(void *const param); void checkTimeouts(); void scheduleTimeoutCheck(); static void HandleResponses(const char *const when); void handleResponse(IpcIoMsg &ipcIo); static void DiskerHandleMoreRequests(void*); static void DiskerHandleRequests(); static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); static bool WaitBeforePop(); private: const String dbName; ///< the name of the file we are managing