Fine-grained locking

This commit is contained in:
Michael Theall 2020-04-07 17:34:45 -05:00
parent 816c3bfa39
commit ffa6195182
2 changed files with 88 additions and 81 deletions

View File

@ -117,6 +117,13 @@ private:
/// \param closeData_ Whether to close data socket /// \param closeData_ Whether to close data socket
void setState (State state_, bool closePasv_, bool closeData_); void setState (State state_, bool closePasv_, bool closeData_);
/// \brief Close socket
/// \param socket_ Socket to close
void closeSocket (SharedSocket &socket_);
/// \brief Close command socket
void closeCommand ();
/// \brief Close passive socket
void closePasv ();
/// \brief Close data socket /// \brief Close data socket
void closeData (); void closeData ();

View File

@ -53,6 +53,13 @@ using namespace std::chrono_literals;
#define lstat stat #define lstat stat
#endif #endif
#define LOCKED(x) \
do \
{ \
auto const lock = std::scoped_lock (m_lock); \
x; \
} while (0)
namespace namespace
{ {
/// \brief Parse command /// \brief Parse command
@ -255,8 +262,8 @@ std::string buildResolvedPath (std::string_view const cwd_, std::string_view con
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
FtpSession::~FtpSession () FtpSession::~FtpSession ()
{ {
m_commandSocket.reset (); closeCommand ();
m_pasvSocket.reset (); closePasv ();
closeData (); closeData ();
} }
@ -285,7 +292,6 @@ FtpSession::FtpSession (UniqueSocket commandSocket_)
m_commandSocket->setNonBlocking (); m_commandSocket->setNonBlocking ();
auto const lock = std::scoped_lock (m_lock);
sendResponse ("220 Hello!\r\n"); sendResponse ("220 Hello!\r\n");
} }
@ -365,7 +371,6 @@ bool FtpSession::poll (std::vector<UniqueFtpSession> const &sessions_)
std::vector<Socket::PollInfo> info; std::vector<Socket::PollInfo> info;
for (auto &session : sessions_) for (auto &session : sessions_)
{ {
auto const lock = std::scoped_lock (session->m_lock);
for (auto &pending : session->m_pendingCloseSocket) for (auto &pending : session->m_pendingCloseSocket)
{ {
assert (pending.unique ()); assert (pending.unique ());
@ -390,7 +395,6 @@ bool FtpSession::poll (std::vector<UniqueFtpSession> const &sessions_)
for (auto &session : sessions_) for (auto &session : sessions_)
{ {
auto const lock = std::scoped_lock (session->m_lock);
for (auto it = std::begin (session->m_pendingCloseSocket); for (auto it = std::begin (session->m_pendingCloseSocket);
it != std::end (session->m_pendingCloseSocket);) it != std::end (session->m_pendingCloseSocket);)
{ {
@ -412,7 +416,6 @@ bool FtpSession::poll (std::vector<UniqueFtpSession> const &sessions_)
info.clear (); info.clear ();
for (auto &session : sessions_) for (auto &session : sessions_)
{ {
auto const lock = std::scoped_lock (session->m_lock);
if (session->m_commandSocket) if (session->m_commandSocket)
{ {
info.emplace_back (Socket::PollInfo{*session->m_commandSocket, POLLIN | POLLPRI, 0}); info.emplace_back (Socket::PollInfo{*session->m_commandSocket, POLLIN | POLLPRI, 0});
@ -472,8 +475,6 @@ bool FtpSession::poll (std::vector<UniqueFtpSession> const &sessions_)
for (auto &session : sessions_) for (auto &session : sessions_)
{ {
auto const lock = std::scoped_lock (session->m_lock);
for (auto const &i : info) for (auto const &i : info)
{ {
if (!i.revents) if (!i.revents)
@ -492,7 +493,7 @@ bool FtpSession::poll (std::vector<UniqueFtpSession> const &sessions_)
session->readCommand (i.revents); session->readCommand (i.revents);
if (i.revents & (POLLERR | POLLHUP)) if (i.revents & (POLLERR | POLLHUP))
session->m_commandSocket.reset (); session->closeCommand ();
} }
// check the data socket // check the data socket
@ -558,36 +559,57 @@ void FtpSession::setState (State const state_, bool const closePasv_, bool const
m_state = state_; m_state = state_;
if (closePasv_) if (closePasv_)
m_pasvSocket.reset (); closePasv ();
if (closeData_) if (closeData_)
closeData (); closeData ();
if (state_ == State::COMMAND) if (state_ == State::COMMAND)
{ {
m_restartPosition = 0; {
m_fileSize = 0; auto lock = std::scoped_lock (m_lock);
m_filePosition = 0;
for (auto &pos : m_filePositionHistory) m_restartPosition = 0;
pos = 0; m_fileSize = 0;
m_xferRate = -1.0f; m_filePosition = 0;
m_workItem.clear (); for (auto &pos : m_filePositionHistory)
pos = 0;
m_xferRate = -1.0f;
m_workItem.clear ();
}
m_file.close (); m_file.close ();
m_dir.close (); m_dir.close ();
} }
} }
void FtpSession::closeSocket (SharedSocket &socket_)
{
if (socket_ && socket_.unique ())
{
socket_->shutdown (SHUT_WR);
socket_->setLinger (true, 0s);
LOCKED (m_pendingCloseSocket.emplace_back (std::move (socket_)));
}
else
LOCKED (socket_.reset ());
}
void FtpSession::closeCommand ()
{
closeSocket (m_commandSocket);
}
void FtpSession::closePasv ()
{
UniqueSocket pasv;
LOCKED (pasv = std::move (m_pasvSocket));
}
void FtpSession::closeData () void FtpSession::closeData ()
{ {
if (m_dataSocket && m_dataSocket.unique ()) closeSocket (m_dataSocket);
{
m_dataSocket->shutdown (SHUT_WR);
m_dataSocket->setLinger (true, 0s);
m_pendingCloseSocket.emplace_back (std::move (m_dataSocket));
}
m_dataSocket.reset ();
m_recv = false; m_recv = false;
m_send = false; m_send = false;
@ -601,9 +623,9 @@ bool FtpSession::changeDir (char const *const args_)
auto const pos = m_cwd.find_last_of ('/'); auto const pos = m_cwd.find_last_of ('/');
assert (pos != std::string::npos); assert (pos != std::string::npos);
if (pos == 0) if (pos == 0)
m_cwd = "/"; LOCKED (m_cwd = "/");
else else
m_cwd = m_cwd.substr (0, pos); LOCKED (m_cwd = m_cwd.substr (0, pos));
return true; return true;
} }
@ -621,7 +643,7 @@ bool FtpSession::changeDir (char const *const args_)
return false; return false;
} }
m_cwd = path; LOCKED (m_cwd = path);
return true; return true;
} }
@ -636,7 +658,8 @@ bool FtpSession::dataAccept ()
m_pasv = false; m_pasv = false;
m_dataSocket = m_pasvSocket->accept (); auto peer = m_pasvSocket->accept ();
LOCKED (m_dataSocket = std::move (peer));
if (!m_dataSocket) if (!m_dataSocket)
{ {
sendResponse ("425 Failed to establish connection\r\n"); sendResponse ("425 Failed to establish connection\r\n");
@ -668,7 +691,8 @@ bool FtpSession::dataConnect ()
m_port = false; m_port = false;
m_dataSocket = Socket::create (); auto data = Socket::create ();
LOCKED (m_dataSocket = std::move (data));
if (!m_dataSocket) if (!m_dataSocket)
return false; return false;
@ -681,10 +705,7 @@ bool FtpSession::dataConnect ()
if (!m_dataSocket->connect (m_portAddr)) if (!m_dataSocket->connect (m_portAddr))
{ {
if (errno != EINPROGRESS) if (errno != EINPROGRESS)
{
m_dataSocket.reset ();
return false; return false;
}
return true; return true;
} }
@ -1015,7 +1036,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_)
return; return;
} }
m_fileSize = st.st_size; LOCKED (m_fileSize = st.st_size);
m_file.setBufferSize (FILE_BUFFERSIZE); m_file.setBufferSize (FILE_BUFFERSIZE);
@ -1028,7 +1049,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_)
} }
} }
m_filePosition = m_restartPosition; LOCKED (m_filePosition = m_restartPosition);
} }
else else
{ {
@ -1062,7 +1083,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_)
} }
} }
m_filePosition = m_restartPosition; LOCKED (m_filePosition = m_restartPosition);
} }
if (!m_port && !m_pasv) if (!m_port && !m_pasv)
@ -1098,7 +1119,7 @@ void FtpSession::xferFile (char const *const args_, XferFileMode const mode_)
m_xferBuffer.clear (); m_xferBuffer.clear ();
m_workItem = path; LOCKED (m_workItem = path);
} }
void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool const workaround_) void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool const workaround_)
@ -1174,7 +1195,7 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool
} }
} }
m_workItem = m_lwd; LOCKED (m_workItem = m_lwd);
} }
else if (mode_ == XferDirMode::MLSD) else if (mode_ == XferDirMode::MLSD)
{ {
@ -1207,7 +1228,7 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool
return; return;
} }
m_workItem = path; LOCKED (m_workItem = path);
} }
} }
else if (!m_dir.open (m_cwd.c_str ())) else if (!m_dir.open (m_cwd.c_str ()))
@ -1234,7 +1255,7 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool
} }
} }
m_workItem = m_lwd; LOCKED (m_workItem = m_lwd);
} }
if (mode_ == XferDirMode::MLST || mode_ == XferDirMode::STAT) if (mode_ == XferDirMode::MLST || mode_ == XferDirMode::STAT)
@ -1242,8 +1263,8 @@ void FtpSession::xferDir (char const *const args_, XferDirMode const mode_, bool
// this is a little different; we have to send the data over the command socket // this is a little different; we have to send the data over the command socket
sendResponse ("213-Status\r\n"); sendResponse ("213-Status\r\n");
setState (State::DATA_TRANSFER, true, true); setState (State::DATA_TRANSFER, true, true);
m_dataSocket = m_commandSocket; LOCKED (m_dataSocket = m_commandSocket);
m_send = true; m_send = true;
return; return;
} }
@ -1277,7 +1298,7 @@ void FtpSession::readCommand (int const events_)
auto const atMark = m_commandSocket->atMark (); auto const atMark = m_commandSocket->atMark ();
if (atMark < 0) if (atMark < 0)
{ {
m_commandSocket.reset (); closeCommand ();
return; return;
} }
@ -1285,25 +1306,21 @@ void FtpSession::readCommand (int const events_)
{ {
// discard in-band data // discard in-band data
m_commandBuffer.clear (); m_commandBuffer.clear ();
m_lock.unlock ();
auto const rc = m_commandSocket->read (m_commandBuffer); auto const rc = m_commandSocket->read (m_commandBuffer);
m_lock.lock ();
if (rc < 0 && errno != EWOULDBLOCK) if (rc < 0 && errno != EWOULDBLOCK)
m_commandSocket.reset (); closeCommand ();
return; return;
} }
// retrieve the urgent data // retrieve the urgent data
m_commandBuffer.clear (); m_commandBuffer.clear ();
m_lock.unlock ();
auto const rc = m_commandSocket->read (m_commandBuffer, true); auto const rc = m_commandSocket->read (m_commandBuffer, true);
m_lock.lock ();
if (rc < 0) if (rc < 0)
{ {
// EWOULDBLOCK means out-of-band data is on the way // EWOULDBLOCK means out-of-band data is on the way
if (errno != EWOULDBLOCK) if (errno != EWOULDBLOCK)
m_commandSocket.reset (); closeCommand ();
return; return;
} }
@ -1318,16 +1335,14 @@ void FtpSession::readCommand (int const events_)
if (m_commandBuffer.freeSize () == 0) if (m_commandBuffer.freeSize () == 0)
{ {
Log::error ("Exceeded command buffer size\n"); Log::error ("Exceeded command buffer size\n");
m_commandSocket.reset (); closeCommand ();
return; return;
} }
m_lock.unlock ();
auto const rc = m_commandSocket->read (m_commandBuffer); auto const rc = m_commandSocket->read (m_commandBuffer);
m_lock.lock ();
if (rc < 0) if (rc < 0)
{ {
m_commandSocket.reset (); closeCommand ();
return; return;
} }
@ -1335,7 +1350,7 @@ void FtpSession::readCommand (int const events_)
{ {
// peer closed connection // peer closed connection
Log::info ("Peer closed connection\n"); Log::info ("Peer closed connection\n");
m_commandSocket.reset (); closeCommand ();
return; return;
} }
@ -1410,7 +1425,7 @@ void FtpSession::readCommand (int const events_)
{ {
sendResponse ("503 Invalid command during transfer\r\n"); sendResponse ("503 Invalid command during transfer\r\n");
setState (State::COMMAND, true, true); setState (State::COMMAND, true, true);
m_commandSocket.reset (); closeCommand ();
} }
else else
{ {
@ -1435,12 +1450,10 @@ void FtpSession::readCommand (int const events_)
void FtpSession::writeResponse () void FtpSession::writeResponse ()
{ {
m_lock.unlock ();
auto const rc = m_commandSocket->write (m_responseBuffer); auto const rc = m_commandSocket->write (m_responseBuffer);
m_lock.lock ();
if (rc <= 0) if (rc <= 0)
{ {
m_commandSocket.reset (); closeCommand ();
return; return;
} }
@ -1468,14 +1481,14 @@ void FtpSession::sendResponse (char const *fmt_, ...)
if (rc < 0) if (rc < 0)
{ {
Log::error ("vsnprintf: %s\n", std::strerror (errno)); Log::error ("vsnprintf: %s\n", std::strerror (errno));
m_commandSocket.reset (); closeCommand ();
return; return;
} }
if (static_cast<std::size_t> (rc) > size) if (static_cast<std::size_t> (rc) > size)
{ {
Log::error ("Not enough space for response\n"); Log::error ("Not enough space for response\n");
m_commandSocket.reset (); closeCommand ();
return; return;
} }
@ -1483,12 +1496,10 @@ void FtpSession::sendResponse (char const *fmt_, ...)
// try to write data immediately // try to write data immediately
assert (m_commandSocket); assert (m_commandSocket);
m_lock.unlock ();
auto const bytes = auto const bytes =
m_commandSocket->write (m_responseBuffer.usedArea (), m_responseBuffer.usedSize ()); m_commandSocket->write (m_responseBuffer.usedArea (), m_responseBuffer.usedSize ());
m_lock.lock ();
if (bytes < 0 && errno != EWOULDBLOCK) if (bytes < 0 && errno != EWOULDBLOCK)
m_commandSocket.reset (); closeCommand ();
else if (bytes > 0) else if (bytes > 0)
{ {
m_responseBuffer.markFree (bytes); m_responseBuffer.markFree (bytes);
@ -1509,7 +1520,7 @@ void FtpSession::sendResponse (std::string_view const response_)
if (response_.size () > size) if (response_.size () > size)
{ {
Log::error ("Not enough space for response\n"); Log::error ("Not enough space for response\n");
m_commandSocket.reset (); closeCommand ();
return; return;
} }
@ -1539,9 +1550,7 @@ bool FtpSession::listTransfer ()
} }
// get the next directory entry // get the next directory entry
m_lock.unlock ();
auto const dent = m_dir.read (); auto const dent = m_dir.read ();
m_lock.lock ();
if (!dent) if (!dent)
{ {
// we have exhausted the directory listing // we have exhausted the directory listing
@ -1634,9 +1643,7 @@ bool FtpSession::listTransfer ()
} }
// send any pending data // send any pending data
m_lock.unlock ();
auto const rc = m_dataSocket->write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ()); auto const rc = m_dataSocket->write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ());
m_lock.lock ();
if (rc <= 0) if (rc <= 0)
{ {
// error sending data // error sending data
@ -1663,9 +1670,7 @@ bool FtpSession::retrieveTransfer ()
auto const size = m_xferBuffer.freeSize (); auto const size = m_xferBuffer.freeSize ();
// we have sent all the data, so read some more // we have sent all the data, so read some more
m_lock.unlock ();
auto const rc = m_file.read (buffer, size); auto const rc = m_file.read (buffer, size);
m_lock.lock ();
if (rc < 0) if (rc < 0)
{ {
// failed to read data // failed to read data
@ -1687,9 +1692,7 @@ bool FtpSession::retrieveTransfer ()
} }
// send any pending data // send any pending data
m_lock.unlock ();
auto const rc = m_dataSocket->write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ()); auto const rc = m_dataSocket->write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ());
m_lock.lock ();
if (rc <= 0) if (rc <= 0)
{ {
// error sending data // error sending data
@ -1702,7 +1705,7 @@ bool FtpSession::retrieveTransfer ()
} }
// we can try to read/send more data // we can try to read/send more data
m_filePosition += rc; LOCKED (m_filePosition += rc);
m_xferBuffer.markFree (rc); m_xferBuffer.markFree (rc);
return true; return true;
} }
@ -1717,9 +1720,7 @@ bool FtpSession::storeTransfer ()
auto const size = m_xferBuffer.freeSize (); auto const size = m_xferBuffer.freeSize ();
// we have written all the received data, so try to get some more // we have written all the received data, so try to get some more
m_lock.unlock ();
auto const rc = m_dataSocket->read (buffer, size); auto const rc = m_dataSocket->read (buffer, size);
m_lock.lock ();
if (rc < 0) if (rc < 0)
{ {
// failed to read data // failed to read data
@ -1744,9 +1745,7 @@ bool FtpSession::storeTransfer ()
} }
// write any pending data // write any pending data
m_lock.unlock ();
auto const rc = m_file.write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ()); auto const rc = m_file.write (m_xferBuffer.usedArea (), m_xferBuffer.usedSize ());
m_lock.lock ();
if (rc <= 0) if (rc <= 0)
{ {
// error writing data // error writing data
@ -1756,7 +1755,7 @@ bool FtpSession::storeTransfer ()
} }
// we can try to recv/write more data // we can try to recv/write more data
m_filePosition += rc; LOCKED (m_filePosition += rc);
m_xferBuffer.markFree (rc); m_xferBuffer.markFree (rc);
return true; return true;
} }
@ -2042,7 +2041,8 @@ void FtpSession::PASV (char const *args_)
m_port = false; m_port = false;
// create a socket to listen on // create a socket to listen on
m_pasvSocket = Socket::create (); auto pasv = Socket::create ();
LOCKED (m_pasvSocket = std::move (pasv));
if (!m_pasvSocket) if (!m_pasvSocket)
{ {
sendResponse ("451 Failed to create listening socket\r\n"); sendResponse ("451 Failed to create listening socket\r\n");
@ -2067,7 +2067,7 @@ void FtpSession::PASV (char const *args_)
// bind to the address // bind to the address
if (!m_pasvSocket->bind (addr)) if (!m_pasvSocket->bind (addr))
{ {
m_pasvSocket.reset (); closePasv ();
sendResponse ("451 Failed to bind address\r\n"); sendResponse ("451 Failed to bind address\r\n");
return; return;
} }
@ -2075,7 +2075,7 @@ void FtpSession::PASV (char const *args_)
// listen on the socket // listen on the socket
if (!m_pasvSocket->listen (1)) if (!m_pasvSocket->listen (1))
{ {
m_pasvSocket.reset (); closePasv ();
sendResponse ("451 Failed to listen on socket\r\n"); sendResponse ("451 Failed to listen on socket\r\n");
return; return;
} }
@ -2197,7 +2197,7 @@ void FtpSession::PWD (char const *args_)
void FtpSession::QUIT (char const *args_) void FtpSession::QUIT (char const *args_)
{ {
sendResponse ("221 Disconnecting\r\n"); sendResponse ("221 Disconnecting\r\n");
m_commandSocket.reset (); closeCommand ();
} }
void FtpSession::REST (char const *args_) void FtpSession::REST (char const *args_)