From ffa61951822a9c019673376ef2caae94368fb67a Mon Sep 17 00:00:00 2001 From: Michael Theall Date: Tue, 7 Apr 2020 17:34:45 -0500 Subject: [PATCH] Fine-grained locking --- include/ftpSession.h | 7 ++ source/ftpSession.cpp | 162 +++++++++++++++++++++--------------------- 2 files changed, 88 insertions(+), 81 deletions(-) diff --git a/include/ftpSession.h b/include/ftpSession.h index 800ca22..2a66f83 100644 --- a/include/ftpSession.h +++ b/include/ftpSession.h @@ -117,6 +117,13 @@ private: /// \param closeData_ Whether to close data socket void setState (State state_, bool closePasv_, bool closeData_); + /// \brief Close socket + /// \param socket_ Socket to close + void closeSocket (SharedSocket &socket_); + /// \brief Close command socket + void closeCommand (); + /// \brief Close passive socket + void closePasv (); /// \brief Close data socket void closeData (); diff --git a/source/ftpSession.cpp b/source/ftpSession.cpp index fd3f297..160254e 100644 --- a/source/ftpSession.cpp +++ b/source/ftpSession.cpp @@ -53,6 +53,13 @@ using namespace std::chrono_literals; #define lstat stat #endif +#define LOCKED(x) \ + do \ + { \ + auto const lock = std::scoped_lock (m_lock); \ + x; \ + } while (0) + namespace { /// \brief Parse command @@ -255,8 +262,8 @@ std::string buildResolvedPath (std::string_view const cwd_, std::string_view con /////////////////////////////////////////////////////////////////////////// FtpSession::~FtpSession () { - m_commandSocket.reset (); - m_pasvSocket.reset (); + closeCommand (); + closePasv (); closeData (); } @@ -285,7 +292,6 @@ FtpSession::FtpSession (UniqueSocket commandSocket_) m_commandSocket->setNonBlocking (); - auto const lock = std::scoped_lock (m_lock); sendResponse ("220 Hello!\r\n"); } @@ -365,7 +371,6 @@ bool FtpSession::poll (std::vector const &sessions_) std::vector info; for (auto &session : sessions_) { - auto const lock = std::scoped_lock (session->m_lock); for (auto &pending : session->m_pendingCloseSocket) { assert (pending.unique ()); @@ -390,7 +395,6 @@ bool FtpSession::poll (std::vector const &sessions_) for (auto &session : sessions_) { - auto const lock = std::scoped_lock (session->m_lock); for (auto it = std::begin (session->m_pendingCloseSocket); it != std::end (session->m_pendingCloseSocket);) { @@ -412,7 +416,6 @@ bool FtpSession::poll (std::vector const &sessions_) info.clear (); for (auto &session : sessions_) { - auto const lock = std::scoped_lock (session->m_lock); if (session->m_commandSocket) { info.emplace_back (Socket::PollInfo{*session->m_commandSocket, POLLIN | POLLPRI, 0}); @@ -472,8 +475,6 @@ bool FtpSession::poll (std::vector const &sessions_) for (auto &session : sessions_) { - auto const lock = std::scoped_lock (session->m_lock); - for (auto const &i : info) { if (!i.revents) @@ -492,7 +493,7 @@ bool FtpSession::poll (std::vector const &sessions_) session->readCommand (i.revents); if (i.revents & (POLLERR | POLLHUP)) - session->m_commandSocket.reset (); + session->closeCommand (); } // check the data socket @@ -558,36 +559,57 @@ void FtpSession::setState (State const state_, bool const closePasv_, bool const m_state = state_; if (closePasv_) - m_pasvSocket.reset (); + closePasv (); if (closeData_) closeData (); if (state_ == State::COMMAND) { - m_restartPosition = 0; - m_fileSize = 0; - m_filePosition = 0; + { + auto lock = std::scoped_lock (m_lock); - for (auto &pos : m_filePositionHistory) - pos = 0; - m_xferRate = -1.0f; + m_restartPosition = 0; + m_fileSize = 0; + m_filePosition = 0; - m_workItem.clear (); + for (auto &pos : m_filePositionHistory) + pos = 0; + m_xferRate = -1.0f; + + m_workItem.clear (); + } m_file.close (); m_dir.close (); } } +void FtpSession::closeSocket (SharedSocket &socket_) +{ + if (socket_ && socket_.unique ()) + { + socket_->shutdown (SHUT_WR); + socket_->setLinger (true, 0s); + LOCKED (m_pendingCloseSocket.emplace_back (std::move (socket_))); + } + else + LOCKED (socket_.reset ()); +} + +void FtpSession::closeCommand () +{ + closeSocket (m_commandSocket); +} + +void FtpSession::closePasv () +{ + UniqueSocket pasv; + LOCKED (pasv = std::move (m_pasvSocket)); +} + void FtpSession::closeData () { - if (m_dataSocket && m_dataSocket.unique ()) - { - m_dataSocket->shutdown (SHUT_WR); - m_dataSocket->setLinger (true, 0s); - m_pendingCloseSocket.emplace_back (std::move (m_dataSocket)); - } - m_dataSocket.reset (); + closeSocket (m_dataSocket); m_recv = false; m_send = false; @@ -601,9 +623,9 @@ bool FtpSession::changeDir (char const *const args_) auto const pos = m_cwd.find_last_of ('/'); assert (pos != std::string::npos); if (pos == 0) - m_cwd = "/"; + LOCKED (m_cwd = "/"); else - m_cwd = m_cwd.substr (0, pos); + LOCKED (m_cwd = m_cwd.substr (0, pos)); return true; } @@ -621,7 +643,7 @@ bool FtpSession::changeDir (char const *const args_) return false; } - m_cwd = path; + LOCKED (m_cwd = path); return true; } @@ -636,7 +658,8 @@ bool FtpSession::dataAccept () m_pasv = false; - m_dataSocket = m_pasvSocket->accept (); + auto peer = m_pasvSocket->accept (); + LOCKED (m_dataSocket = std::move (peer)); if (!m_dataSocket) { sendResponse ("425 Failed to establish connection\r\n"); @@ -668,7 +691,8 @@ bool FtpSession::dataConnect () m_port = false; - m_dataSocket = Socket::create (); + auto data = Socket::create (); + LOCKED (m_dataSocket = std::move (data)); if (!m_dataSocket) return false; @@ -681,10 +705,7 @@ bool FtpSession::dataConnect () if (!m_dataSocket->connect (m_portAddr)) { if (errno != EINPROGRESS) - { - m_dataSocket.reset (); return false; - } return true; } @@ -1015,7 +1036,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_) return; } - m_fileSize = st.st_size; + LOCKED (m_fileSize = st.st_size); m_file.setBufferSize (FILE_BUFFERSIZE); @@ -1028,7 +1049,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_) } } - m_filePosition = m_restartPosition; + LOCKED (m_filePosition = m_restartPosition); } else { @@ -1062,7 +1083,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_) } } - m_filePosition = m_restartPosition; + LOCKED (m_filePosition = m_restartPosition); } if (!m_port && !m_pasv) @@ -1098,7 +1119,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_) m_xferBuffer.clear (); - m_workItem = path; + LOCKED (m_workItem = path); } void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool const workaround_) @@ -1174,7 +1195,7 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool } } - m_workItem = m_lwd; + LOCKED (m_workItem = m_lwd); } else if (mode_ == XferDirMode::MLSD) { @@ -1207,7 +1228,7 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool return; } - m_workItem = path; + LOCKED (m_workItem = path); } } else if (!m_dir.open (m_cwd.c_str ())) @@ -1234,7 +1255,7 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool } } - m_workItem = m_lwd; + LOCKED (m_workItem = m_lwd); } if (mode_ == XferDirMode::MLST || mode_ == XferDirMode::STAT) @@ -1242,8 +1263,8 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool // this is a little different; we have to send the data over the command socket sendResponse ("213-Status\r\n"); setState (State::DATA_TRANSFER, true, true); - m_dataSocket = m_commandSocket; - m_send = true; + LOCKED (m_dataSocket = m_commandSocket); + m_send = true; return; } @@ -1277,7 +1298,7 @@ void FtpSession::readCommand (int const events_) auto const atMark = m_commandSocket->atMark (); if (atMark < 0) { - m_commandSocket.reset (); + closeCommand (); return; } @@ -1285,25 +1306,21 @@ void FtpSession::readCommand (int const events_) { // discard in-band data m_commandBuffer.clear (); - m_lock.unlock (); auto const rc = m_commandSocket->read (m_commandBuffer); - m_lock.lock (); if (rc < 0 && errno != EWOULDBLOCK) - m_commandSocket.reset (); + closeCommand (); return; } // retrieve the urgent data m_commandBuffer.clear (); - m_lock.unlock (); auto const rc = m_commandSocket->read (m_commandBuffer, true); - m_lock.lock (); if (rc < 0) { // EWOULDBLOCK means out-of-band data is on the way if (errno != EWOULDBLOCK) - m_commandSocket.reset (); + closeCommand (); return; } @@ -1318,16 +1335,14 @@ void FtpSession::readCommand (int const events_) if (m_commandBuffer.freeSize () == 0) { Log::error ("Exceeded command buffer size\n"); - m_commandSocket.reset (); + closeCommand (); return; } - m_lock.unlock (); auto const rc = m_commandSocket->read (m_commandBuffer); - m_lock.lock (); if (rc < 0) { - m_commandSocket.reset (); + closeCommand (); return; } @@ -1335,7 +1350,7 @@ void FtpSession::readCommand (int const events_) { // peer closed connection Log::info ("Peer closed connection\n"); - m_commandSocket.reset (); + closeCommand (); return; } @@ -1410,7 +1425,7 @@ void FtpSession::readCommand (int const events_) { sendResponse ("503 Invalid command during transfer\r\n"); setState (State::COMMAND, true, true); - m_commandSocket.reset (); + closeCommand (); } else { @@ -1435,12 +1450,10 @@ void FtpSession::readCommand (int const events_) void FtpSession::writeResponse () { - m_lock.unlock (); auto const rc = m_commandSocket->write (m_responseBuffer); - m_lock.lock (); if (rc <= 0) { - m_commandSocket.reset (); + closeCommand (); return; } @@ -1468,14 +1481,14 @@ void FtpSession::sendResponse (char const *fmt_, ...) if (rc < 0) { Log::error ("vsnprintf: %s\n", std::strerror (errno)); - m_commandSocket.reset (); + closeCommand (); return; } if (static_cast (rc) > size) { Log::error ("Not enough space for response\n"); - m_commandSocket.reset (); + closeCommand (); return; } @@ -1483,12 +1496,10 @@ void FtpSession::sendResponse (char const *fmt_, ...) // try to write data immediately assert (m_commandSocket); - m_lock.unlock (); auto const bytes = m_commandSocket->write (m_responseBuffer.usedArea (), m_responseBuffer.usedSize ()); - m_lock.lock (); if (bytes < 0 && errno != EWOULDBLOCK) - m_commandSocket.reset (); + closeCommand (); else if (bytes > 0) { m_responseBuffer.markFree (bytes); @@ -1509,7 +1520,7 @@ void FtpSession::sendResponse (std::string_view const response_) if (response_.size () > size) { Log::error ("Not enough space for response\n"); - m_commandSocket.reset (); + closeCommand (); return; } @@ -1539,9 +1550,7 @@ bool FtpSession::listTransfer () } // get the next directory entry - m_lock.unlock (); auto const dent = m_dir.read (); - m_lock.lock (); if (!dent) { // we have exhausted the directory listing @@ -1634,9 +1643,7 @@ bool FtpSession::listTransfer () } // send any pending data - m_lock.unlock (); auto const rc = m_dataSocket->write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ()); - m_lock.lock (); if (rc <= 0) { // error sending data @@ -1663,9 +1670,7 @@ bool FtpSession::retrieveTransfer () auto const size = m_xferBuffer.freeSize (); // we have sent all the data, so read some more - m_lock.unlock (); auto const rc = m_file.read (buffer, size); - m_lock.lock (); if (rc < 0) { // failed to read data @@ -1687,9 +1692,7 @@ bool FtpSession::retrieveTransfer () } // send any pending data - m_lock.unlock (); auto const rc = m_dataSocket->write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ()); - m_lock.lock (); if (rc <= 0) { // error sending data @@ -1702,7 +1705,7 @@ bool FtpSession::retrieveTransfer () } // we can try to read/send more data - m_filePosition += rc; + LOCKED (m_filePosition += rc); m_xferBuffer.markFree (rc); return true; } @@ -1717,9 +1720,7 @@ bool FtpSession::storeTransfer () auto const size = m_xferBuffer.freeSize (); // we have written all the received data, so try to get some more - m_lock.unlock (); auto const rc = m_dataSocket->read (buffer, size); - m_lock.lock (); if (rc < 0) { // failed to read data @@ -1744,9 +1745,7 @@ bool FtpSession::storeTransfer () } // write any pending data - m_lock.unlock (); auto const rc = m_file.write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ()); - m_lock.lock (); if (rc <= 0) { // error writing data @@ -1756,7 +1755,7 @@ bool FtpSession::storeTransfer () } // we can try to recv/write more data - m_filePosition += rc; + LOCKED (m_filePosition += rc); m_xferBuffer.markFree (rc); return true; } @@ -2042,7 +2041,8 @@ void FtpSession::PASV (char const *args_) m_port = false; // create a socket to listen on - m_pasvSocket = Socket::create (); + auto pasv = Socket::create (); + LOCKED (m_pasvSocket = std::move (pasv)); if (!m_pasvSocket) { sendResponse ("451 Failed to create listening socket\r\n"); @@ -2067,7 +2067,7 @@ void FtpSession::PASV (char const *args_) // bind to the address if (!m_pasvSocket->bind (addr)) { - m_pasvSocket.reset (); + closePasv (); sendResponse ("451 Failed to bind address\r\n"); return; } @@ -2075,7 +2075,7 @@ void FtpSession::PASV (char const *args_) // listen on the socket if (!m_pasvSocket->listen (1)) { - m_pasvSocket.reset (); + closePasv (); sendResponse ("451 Failed to listen on socket\r\n"); return; } @@ -2197,7 +2197,7 @@ void FtpSession::PWD (char const *args_) void FtpSession::QUIT (char const *args_) { sendResponse ("221 Disconnecting\r\n"); - m_commandSocket.reset (); + closeCommand (); } void FtpSession::REST (char const *args_)