From 2d6df16849ebcf237d17c919727756d90974daba Mon Sep 17 00:00:00 2001 From: Nicole Mazzuca Date: Wed, 10 Jul 2019 14:35:10 -0700 Subject: [PATCH] remove_all parallelized, and fix the issues with symlink --- toolsrc/include/vcpkg/base/files.h | 38 ++++- toolsrc/include/vcpkg/base/rng.h | 165 ++++++++++++++++---- toolsrc/include/vcpkg/base/work_queue.h | 183 +++++++++++++++++++++++ toolsrc/src/vcpkg/base/files.cpp | 190 +++++++++++++++++++----- toolsrc/src/vcpkg/base/rng.cpp | 4 +- 5 files changed, 508 insertions(+), 72 deletions(-) create mode 100644 toolsrc/include/vcpkg/base/work_queue.h diff --git a/toolsrc/include/vcpkg/base/files.h b/toolsrc/include/vcpkg/base/files.h index 3ea0d6036..178fae541 100644 --- a/toolsrc/include/vcpkg/base/files.h +++ b/toolsrc/include/vcpkg/base/files.h @@ -12,14 +12,50 @@ namespace fs using stdfs::copy_options; using stdfs::file_status; using stdfs::file_type; + using stdfs::perms; using stdfs::path; using stdfs::u8path; + /* + std::experimental::filesystem's file_status and file_type are broken in + the presence of symlinks -- a symlink is treated as the object it points + to for `symlink_status` and `symlink_type` + */ + + using stdfs::status; + + // we want to poison ADL with these niebloids + constexpr struct { + file_status operator()(const path& p, std::error_code& ec) const noexcept; + file_status operator()(const path& p) const noexcept; + } symlink_status{}; + + constexpr struct { + inline bool operator()(file_status s) const { + return stdfs::is_symlink(s); + } + + inline bool operator()(const path& p) const { + return stdfs::is_symlink(symlink_status(p)); + } + inline bool operator()(const path& p, std::error_code& ec) const { + return stdfs::is_symlink(symlink_status(p, ec)); + } + } is_symlink{}; + inline bool is_regular_file(file_status s) { return stdfs::is_regular_file(s); } inline bool is_directory(file_status s) { return stdfs::is_directory(s); } - inline bool is_symlink(file_status s) { return stdfs::is_symlink(s); } } +/* + if someone attempts to use unqualified `symlink_status` or `is_symlink`, + they might get the ADL version, which is broken. + Therefore, put `symlink_status` in the global namespace, so that they get + our symlink_status. +*/ +using fs::symlink_status; +using fs::is_symlink; + namespace vcpkg::Files { struct Filesystem diff --git a/toolsrc/include/vcpkg/base/rng.h b/toolsrc/include/vcpkg/base/rng.h index 1bcab05b3..4a0411f64 100644 --- a/toolsrc/include/vcpkg/base/rng.h +++ b/toolsrc/include/vcpkg/base/rng.h @@ -4,17 +4,56 @@ #include #include -namespace vcpkg { +namespace vcpkg::Rng { + + namespace detail { + template + constexpr std::size_t bitsize = sizeof(T) * CHAR_BITS; + + template + constexpr bool is_valid_shift(int k) { + return 0 <= k && k <= bitsize; + } + + // precondition: 0 <= k < bitsize + template + constexpr T ror(T x, int k) { + if (k == 0) { + return x; + } + return (x >> k) | (x << (bitsize - k)); + } + + // precondition: 0 <= k < bitsize + template + constexpr T rol(T x, int k) { + if (k == 0) { + return x; + } + return (x << k) | (x >> (bitsize - k)); + } + + // there _is_ a way to do this generally, but I don't know how to + template + struct XoshiroJumpTable; + + template <> + struct XoshiroJumpTable { + constexpr static std::uint64_t value[4] = { + 0x180ec6d33cfd0aba, 0xd5a61266f0c9392c, 0xa9582618e03fc9aa, 0x39abdc4529b1661c + }; + }; + } /* NOTE(ubsan): taken from the xoshiro paper initialized from random_device by default actual code is copied from wikipedia, since I wrote that code */ - struct splitmix64_engine { - splitmix64_engine() noexcept; + struct splitmix { + splitmix() noexcept; - constexpr splitmix64_engine(std::uint64_t seed) noexcept + constexpr splitmix(std::uint64_t seed) noexcept : state(seed) {} constexpr std::uint64_t operator()() noexcept { @@ -35,62 +74,126 @@ namespace vcpkg { return std::numeric_limits::min(); } + template + constexpr void fill(T* first, T* last) { + constexpr auto mask = + static_cast(std::numeric_limits::max()); + + const auto remaining = + (last - first) % (sizeof(std::uint64_t) / sizeof(T)); + + for (auto it = first; it != last - remaining;) { + const auto item = (*this)(); + for ( + int shift = 0; + shift < 64; + shift += detail::bitsize, ++it + ) { + *it = static_cast((item >> shift) & mask); + } + } + + if (remaining == 0) return; + + int shift = 0; + const auto item = (*this)(); + for (auto it = last - remaining; + it != last; + shift += detail::bitsize, ++it + ) { + *it = static_cast((item >> shift) & mask); + } + } + private: std::uint64_t state; }; - // Sebastian Vigna's xorshift-based xoshiro xoshiro256** engine + template + struct starstar_scrambler { + constexpr static UIntType scramble(UIntType n) noexcept { + return detail::rol(n * S, R) * T; + } + }; + + // Sebastian Vigna's xorshift-based xoshiro engine // fast and really good - // uses the splitmix64_engine to initialize state - struct xoshiro256ss_engine { - // splitmix64_engine will be initialized with random_device - xoshiro256ss_engine() noexcept { - splitmix64_engine sm64{}; + // uses the splitmix to initialize state + template + struct xoshiro_engine { + static_assert(detail::is_valid_shift(A)); + static_assert(detail::is_valid_shift(B)); + static_assert(std::is_unsigned_v); - for (std::uint64_t& s : this->state) { - s = sm64(); - } + // splitmix will be initialized with random_device + xoshiro_engine() noexcept { + splitmix sm{}; + + sm.fill(&state[0], &state[4]); } - constexpr xoshiro256ss_engine(std::uint64_t seed) noexcept : state() { - splitmix64_engine sm64{seed}; + constexpr xoshiro_engine(std::uint64_t seed) noexcept : state() { + splitmix sm{seed}; - for (std::uint64_t& s : this->state) { - s = sm64(); - } + sm.fill(&state[0], &state[4]); } - constexpr std::uint64_t operator()() noexcept { - std::uint64_t const result = rol(state[1] * 5, 7) * 9; + constexpr UIntType operator()() noexcept { + const UIntType result = Scrambler::scramble(state[0]); - std::uint64_t const t = state[1] << 17; + const UIntType t = state[1] << A; - // state[i] = state[i] ^ state[i + 4 mod 4] state[2] ^= state[0]; state[3] ^= state[1]; state[1] ^= state[2]; state[0] ^= state[3]; state[2] ^= t; - state[3] ^= rol(state[3], 45); + state[3] ^= detail::rol(state[3], B); return result; } - constexpr std::uint64_t max() const noexcept { - return std::numeric_limits::max(); + constexpr UIntType max() const noexcept { + return std::numeric_limits::max(); } constexpr std::uint64_t min() const noexcept { - return std::numeric_limits::min(); + return std::numeric_limits::min(); + } + + // quickly jump ahead 2^e steps + // takes 4 * bitsize rng next operations + template + constexpr void discard_e() noexcept { + using JT = detail::XoshiroJumpTable; + + UIntType s[4] = {}; + for (const auto& jump : JT::value) { + for (std::size_t i = 0; i < bitsize; ++i) { + if ((jump >> i) & 1) { + s[0] ^= state[0]; + s[1] ^= state[1]; + s[2] ^= state[2]; + s[3] ^= state[3]; + } + (*this)(); + } + } + + state[0] = s[0]; + state[1] = s[1]; + state[2] = s[2]; + state[3] = s[3]; } private: // rotate left - constexpr std::uint64_t rol(std::uint64_t x, int k) { - return (x << k) | (x >> (64 - k)); - } - - std::uint64_t state[4]; + UIntType state[4]; }; + using xoshiro256ss = xoshiro_engine< + std::uint64_t, + starstar_scrambler, + 17, + 45>; } diff --git a/toolsrc/include/vcpkg/base/work_queue.h b/toolsrc/include/vcpkg/base/work_queue.h new file mode 100644 index 000000000..4db167fa6 --- /dev/null +++ b/toolsrc/include/vcpkg/base/work_queue.h @@ -0,0 +1,183 @@ +#pragma once + +#include +#include + +namespace vcpkg { + namespace detail { + template + auto call_action( + Action& action, + const WorkQueue& work_queue, + ThreadLocalData& tld + ) -> decltype(static_cast(std::move(action)(tld, work_queue))) + { + std::move(action)(tld, work_queue); + } + + template + auto call_action( + Action& action, + const WorkQueue&, + ThreadLocalData& tld + ) -> decltype(static_cast(std::move(action)(tld))) + { + std::move(action)(tld); + } + } + + template + struct WorkQueue { + template + explicit WorkQueue(const F& initializer) noexcept { + state = State::Joining; + + std::size_t num_threads = std::thread::hardware_concurrency(); + if (num_threads == 0) { + num_threads = 4; + } + + m_threads.reserve(num_threads); + for (std::size_t i = 0; i < num_threads; ++i) { + m_threads.emplace_back(this, initializer); + } + } + + WorkQueue(WorkQueue const&) = delete; + WorkQueue(WorkQueue&&) = delete; + + ~WorkQueue() = default; + + // runs all remaining tasks, and blocks on their finishing + // if this is called in an existing task, _will block forever_ + // DO NOT DO THAT + // thread-unsafe + void join() { + { + auto lck = std::unique_lock(m_mutex); + if (m_state == State::Running) { + m_state = State::Joining; + } else if (m_state == State::Joining) { + Checks::exit_with_message(VCPKG_LINE_INFO, "Attempted to join more than once"); + } + } + for (auto& thrd : m_threads) { + thrd.join(); + } + } + + // useful in the case of errors + // doesn't stop any existing running tasks + // returns immediately, so that one can call this in a task + void terminate() const { + { + auto lck = std::unique_lock(m_mutex); + m_state = State::Terminated; + } + m_cv.notify_all(); + } + + void enqueue_action(Action a) const { + { + auto lck = std::unique_lock(m_mutex); + m_actions.push_back(std::move(a)); + } + m_cv.notify_one(); + } + + template + void enqueue_all_actions_by_move(Rng&& rng) const { + { + using std::begin; + using std::end; + + auto lck = std::unique_lock(m_mutex); + + auto first = begin(rng); + auto last = end(rng); + + m_actions.reserve(m_actions.size() + (end - begin)); + + std::move(first, last, std::back_insert_iterator(rng)); + } + + m_cv.notify_all(); + } + + template + void enqueue_all_actions(Rng&& rng) const { + { + using std::begin; + using std::end; + + auto lck = std::unique_lock(m_mutex); + + auto first = begin(rng); + auto last = end(rng); + + m_actions.reserve(m_actions.size() + (end - begin)); + + std::copy(first, last, std::back_insert_iterator(rng)); + } + + m_cv.notify_all(); + } + + private: + friend struct WorkQueueWorker { + const WorkQueue* work_queue; + ThreadLocalData tld; + + template + WorkQueueWorker(const WorkQueue* work_queue, const F& initializer) + : work_queue(work_queue), tld(initializer()) + { } + + void operator()() { + for (;;) { + auto lck = std::unique_lock(work_queue->m_mutex); + ++work_queue->running_workers; + + const auto state = work_queue->m_state; + + if (state == State::Terminated) { + --work_queue->running_workers; + return; + } + + if (work_queue->m_actions.empty()) { + --work_queue->running_workers; + if (state == State::Running || work_queue->running_workers > 0) { + work_queue->m_cv.wait(lck); + continue; + } + + // state == State::Joining and we are the only worker + // no more work! + return; + } + + Action action = work_queue->m_actions.pop_back(); + lck.unlock(); + + detail::call_action(action, *work_queue, tld); + } + } + }; + + enum class State : std::uint16_t { + Running, + Joining, + Terminated, + }; + + mutable std::mutex m_mutex; + // these four are under m_mutex + mutable State m_state; + mutable std::uint16_t running_workers; + mutable std::vector m_actions; + mutable std::condition_variable condition_variable; + + std::vector m_threads; + }; +} diff --git a/toolsrc/src/vcpkg/base/files.cpp b/toolsrc/src/vcpkg/base/files.cpp index d0926bb4c..e89c531be 100644 --- a/toolsrc/src/vcpkg/base/files.cpp +++ b/toolsrc/src/vcpkg/base/files.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #if defined(__linux__) || defined(__APPLE__) @@ -21,6 +22,45 @@ #include #endif +namespace fs { + file_status decltype(symlink_status)::operator()(const path& p, std::error_code& ec) const noexcept { +#if defined(_WIN32) + /* + do not find the permissions of the file -- it's unnecessary for the + things that vcpkg does. + if one were to add support for this in the future, one should look + into GetFileSecurityW + */ + perms permissions = perms::unknown; + + WIN32_FILE_ATTRIBUTE_DATA file_attributes; + file_type ft = file_type::unknown; + if (!GetFileAttributesExW(p.c_str(), GetFileExInfoStandard, &file_attributes)) { + ft = file_type::not_found; + } else if (file_attributes.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) { + // check for reparse point -- if yes, then symlink + ft = file_type::symlink; + } else if (file_attributes.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + ft = file_type::directory; + } else { + // otherwise, the file is a regular file + ft = file_type::regular; + } + + return file_status(ft, permissions); + +#else + return stdfs::symlink_status(p, ec); +#endif + } + + file_status decltype(symlink_status)::operator()(const path& p) const noexcept { + std::error_code ec; + auto result = symlink_status(p, ec); + if (ec) vcpkg::Checks::exit_with_message(VCPKG_LINE_INFO, "error getting status of path %s: %s", p, ec.message()); + } +} + namespace vcpkg::Files { static const std::regex FILESYSTEM_INVALID_CHARACTERS_REGEX = std::regex(R"([\/:*?"<>|])"); @@ -263,55 +303,129 @@ namespace vcpkg::Files (as well as on macOS and Linux), this is just as fast and will have fewer spurious errors due to locks. */ - struct recursive { - const fs::path& tmp_directory; - std::error_code& ec; - xoshiro256ss_engine& rng; - void operator()(const fs::path& current) const { - const auto type = fs::stdfs::symlink_status(current, ec).type(); - if (ec) return; + /* + `remove` doesn't actually remove anything -- it simply moves the + files into a parent directory (which ends up being at `path`), + and then inserts `actually_remove{current_path}` into the work + queue. + */ + struct remove { + struct tld { + const fs::path& tmp_directory; + std::uint64_t index; - const auto tmp_name = Strings::b64url_encode(rng()); - const auto tmp_path = tmp_directory / tmp_name; + std::atomic& files_deleted; - switch (type) { - case fs::file_type::directory: { - fs::stdfs::rename(current, tmp_path, ec); - if (ec) return; - for (const auto& entry : fs::stdfs::directory_iterator(tmp_path)) { - (*this)(entry); + std::mutex& ec_mutex; + std::error_code& ec; + }; + + struct actually_remove; + using queue = WorkQueue; + + /* + if `current_path` is a directory, first `remove`s all + elements of the directory, then calls remove. + + else, just calls remove. + */ + struct actually_remove { + fs::path current_path; + + void operator()(tld& info, const queue& queue) const { + std::error_code ec; + const auto path_type = fs::symlink_status(current_path, ec).type(); + + if (check_ec(ec, info, queue)) return; + + if (path_type == fs::file_type::directory) { + for (const auto& entry : fs::stdfs::directory_iterator(current_path)) { + remove{}(entry, info, queue); + } + } + + if (fs::stdfs::remove(current_path, ec)) { + info.files_deleted.fetch_add(1, std::memory_order_relaxed); + } else { + check_ec(ec, info, queue); } - fs::stdfs::remove(tmp_path, ec); - } break; - case fs::file_type::symlink: - case fs::file_type::regular: { - fs::stdfs::rename(current, tmp_path, ec); - fs::stdfs::remove(current, ec); - } break; - case fs::file_type::not_found: return; - case fs::file_type::none: { - Checks::exit_with_message(VCPKG_LINE_INFO, "Error occurred when evaluating file type of file: %s", current); - } - default: { - Checks::exit_with_message(VCPKG_LINE_INFO, "Attempted to delete special file: %s", current); } + }; + + static bool check_ec(const std::error_code& ec, tld& info, const queue& queue) { + if (ec) { + queue.terminate(); + + auto lck = std::unique_lock(info.ec_mutex); + if (!info.ec) { + info.ec = ec; + } + + return true; + } else { + return false; } } + + void operator()(const fs::path& current_path, tld& info, const queue& queue) const { + std::error_code ec; + + const auto type = fs::symlink_status(current_path, ec).type(); + if (check_ec(ec, info, queue)) return; + + const auto tmp_name = Strings::b64url_encode(info.index++); + const auto tmp_path = info.tmp_directory / tmp_name; + + fs::stdfs::rename(current_path, tmp_path, ec); + if (check_ec(ec, info, queue)) return; + + queue.enqueue_action(actually_remove{std::move(tmp_path)}); + } }; - auto const real_path = fs::stdfs::absolute(path); + const auto path_type = fs::symlink_status(path, ec).type(); - if (! real_path.has_parent_path()) { - Checks::exit_with_message(VCPKG_LINE_INFO, "Attempted to remove_all the base directory"); + std::atomic files_deleted = 0; + + if (path_type == fs::file_type::directory) { + std::uint64_t index = 0; + std::mutex ec_mutex; + + auto queue = remove::queue([&] { + index += 1 << 32; + return remove::tld{path, index, files_deleted, ec_mutex, ec}; + }); + + index += 1 << 32; + auto main_tld = remove::tld{path, index, files_deleted, ec_mutex, ec}; + for (const auto& entry : fs::stdfs::directory_iterator(path)) { + remove{}(entry, main_tld, queue); + } + + queue.join(); } - // thoughts: is this fine? or should we do something different? - // maybe a temporary directory? - auto const base_path = real_path.parent_path(); + /* + we need to do backoff on the removal of the top level directory, + since we need to place all moved files into that top level + directory, and so we can only delete the directory after all the + lower levels have been deleted. + */ + for (int backoff = 0; backoff < 5; ++backoff) { + if (backoff) { + using namespace std::chrono_literals; + auto backoff_time = 100ms * backoff; + std::this_thread::sleep_for(backoff_time); + } - xoshiro256ss_engine rng{}; - recursive{base_path, ec, rng}(real_path); + if (fs::stdfs::remove(path, ec)) { + files_deleted.fetch_add(1, std::memory_order_relaxed); + break; + } + } + + return files_deleted; } virtual bool exists(const fs::path& path) const override { return fs::stdfs::exists(path); } virtual bool is_directory(const fs::path& path) const override { return fs::stdfs::is_directory(path); } @@ -343,11 +457,11 @@ namespace vcpkg::Files virtual fs::file_status status(const fs::path& path, std::error_code& ec) const override { - return fs::stdfs::status(path, ec); + return fs::status(path, ec); } virtual fs::file_status symlink_status(const fs::path& path, std::error_code& ec) const override { - return fs::stdfs::symlink_status(path, ec); + return fs::symlink_status(path, ec); } virtual void write_contents(const fs::path& file_path, const std::string& data, std::error_code& ec) override { diff --git a/toolsrc/src/vcpkg/base/rng.cpp b/toolsrc/src/vcpkg/base/rng.cpp index 9fe2ea3b4..40ff646b7 100644 --- a/toolsrc/src/vcpkg/base/rng.cpp +++ b/toolsrc/src/vcpkg/base/rng.cpp @@ -1,11 +1,11 @@ #include -namespace vcpkg { +namespace vcpkg::Rng { namespace { std::random_device system_entropy{}; } - splitmix64_engine::splitmix64_engine() { + splitmix::splitmix() { std::uint64_t top_half = system_entropy(); std::uint64_t bottom_half = system_entropy();