remove_all parallelized, and fix the issues with symlink

This commit is contained in:
Nicole Mazzuca 2019-07-10 14:35:10 -07:00
parent 5857e2c680
commit 2d6df16849
5 changed files with 508 additions and 72 deletions

View File

@ -12,14 +12,50 @@ namespace fs
using stdfs::copy_options;
using stdfs::file_status;
using stdfs::file_type;
using stdfs::perms;
using stdfs::path;
using stdfs::u8path;
/*
std::experimental::filesystem's file_status and file_type are broken in
the presence of symlinks -- a symlink is treated as the object it points
to for `symlink_status` and `symlink_type`
*/
using stdfs::status;
// we want to poison ADL with these niebloids
constexpr struct {
file_status operator()(const path& p, std::error_code& ec) const noexcept;
file_status operator()(const path& p) const noexcept;
} symlink_status{};
constexpr struct {
inline bool operator()(file_status s) const {
return stdfs::is_symlink(s);
}
inline bool operator()(const path& p) const {
return stdfs::is_symlink(symlink_status(p));
}
inline bool operator()(const path& p, std::error_code& ec) const {
return stdfs::is_symlink(symlink_status(p, ec));
}
} is_symlink{};
inline bool is_regular_file(file_status s) { return stdfs::is_regular_file(s); }
inline bool is_directory(file_status s) { return stdfs::is_directory(s); }
inline bool is_symlink(file_status s) { return stdfs::is_symlink(s); }
}
/*
if someone attempts to use unqualified `symlink_status` or `is_symlink`,
they might get the ADL version, which is broken.
Therefore, put `symlink_status` in the global namespace, so that they get
our symlink_status.
*/
using fs::symlink_status;
using fs::is_symlink;
namespace vcpkg::Files
{
struct Filesystem

View File

@ -4,17 +4,56 @@
#include <limits>
#include <random>
namespace vcpkg {
namespace vcpkg::Rng {
namespace detail {
template <class T>
constexpr std::size_t bitsize = sizeof(T) * CHAR_BITS;
template <class T>
constexpr bool is_valid_shift(int k) {
return 0 <= k && k <= bitsize<T>;
}
// precondition: 0 <= k < bitsize<T>
template <class T>
constexpr T ror(T x, int k) {
if (k == 0) {
return x;
}
return (x >> k) | (x << (bitsize<T> - k));
}
// precondition: 0 <= k < bitsize<T>
template <class T>
constexpr T rol(T x, int k) {
if (k == 0) {
return x;
}
return (x << k) | (x >> (bitsize<T> - k));
}
// there _is_ a way to do this generally, but I don't know how to
template <class UIntType, int e>
struct XoshiroJumpTable;
template <>
struct XoshiroJumpTable<std::uint64_t, 128> {
constexpr static std::uint64_t value[4] = {
0x180ec6d33cfd0aba, 0xd5a61266f0c9392c, 0xa9582618e03fc9aa, 0x39abdc4529b1661c
};
};
}
/*
NOTE(ubsan): taken from the xoshiro paper
initialized from random_device by default
actual code is copied from wikipedia, since I wrote that code
*/
struct splitmix64_engine {
splitmix64_engine() noexcept;
struct splitmix {
splitmix() noexcept;
constexpr splitmix64_engine(std::uint64_t seed) noexcept
constexpr splitmix(std::uint64_t seed) noexcept
: state(seed) {}
constexpr std::uint64_t operator()() noexcept {
@ -35,62 +74,126 @@ namespace vcpkg {
return std::numeric_limits<std::uint64_t>::min();
}
template <class T>
constexpr void fill(T* first, T* last) {
constexpr auto mask =
static_cast<std::uint64_t>(std::numeric_limits<T>::max());
const auto remaining =
(last - first) % (sizeof(std::uint64_t) / sizeof(T));
for (auto it = first; it != last - remaining;) {
const auto item = (*this)();
for (
int shift = 0;
shift < 64;
shift += detail::bitsize<T>, ++it
) {
*it = static_cast<T>((item >> shift) & mask);
}
}
if (remaining == 0) return;
int shift = 0;
const auto item = (*this)();
for (auto it = last - remaining;
it != last;
shift += detail::bitsize<T>, ++it
) {
*it = static_cast<T>((item >> shift) & mask);
}
}
private:
std::uint64_t state;
};
// Sebastian Vigna's xorshift-based xoshiro xoshiro256** engine
template <class UIntType, int S, int R, int T>
struct starstar_scrambler {
constexpr static UIntType scramble(UIntType n) noexcept {
return detail::rol(n * S, R) * T;
}
};
// Sebastian Vigna's xorshift-based xoshiro engine
// fast and really good
// uses the splitmix64_engine to initialize state
struct xoshiro256ss_engine {
// splitmix64_engine will be initialized with random_device
xoshiro256ss_engine() noexcept {
splitmix64_engine sm64{};
// uses the splitmix to initialize state
template <class UIntType, class Scrambler, int A, int B>
struct xoshiro_engine {
static_assert(detail::is_valid_shift<UIntType>(A));
static_assert(detail::is_valid_shift<UIntType>(B));
static_assert(std::is_unsigned_v<UIntType>);
for (std::uint64_t& s : this->state) {
s = sm64();
}
// splitmix will be initialized with random_device
xoshiro_engine() noexcept {
splitmix sm{};
sm.fill(&state[0], &state[4]);
}
constexpr xoshiro256ss_engine(std::uint64_t seed) noexcept : state() {
splitmix64_engine sm64{seed};
constexpr xoshiro_engine(std::uint64_t seed) noexcept : state() {
splitmix sm{seed};
for (std::uint64_t& s : this->state) {
s = sm64();
}
sm.fill(&state[0], &state[4]);
}
constexpr std::uint64_t operator()() noexcept {
std::uint64_t const result = rol(state[1] * 5, 7) * 9;
constexpr UIntType operator()() noexcept {
const UIntType result = Scrambler::scramble(state[0]);
std::uint64_t const t = state[1] << 17;
const UIntType t = state[1] << A;
// state[i] = state[i] ^ state[i + 4 mod 4]
state[2] ^= state[0];
state[3] ^= state[1];
state[1] ^= state[2];
state[0] ^= state[3];
state[2] ^= t;
state[3] ^= rol(state[3], 45);
state[3] ^= detail::rol(state[3], B);
return result;
}
constexpr std::uint64_t max() const noexcept {
return std::numeric_limits<std::uint64_t>::max();
constexpr UIntType max() const noexcept {
return std::numeric_limits<UIntType>::max();
}
constexpr std::uint64_t min() const noexcept {
return std::numeric_limits<std::uint64_t>::min();
return std::numeric_limits<UIntType>::min();
}
// quickly jump ahead 2^e steps
// takes 4 * bitsize<UIntType> rng next operations
template <int e>
constexpr void discard_e() noexcept {
using JT = detail::XoshiroJumpTable<UIntType, e>;
UIntType s[4] = {};
for (const auto& jump : JT::value) {
for (std::size_t i = 0; i < bitsize<UIntType>; ++i) {
if ((jump >> i) & 1) {
s[0] ^= state[0];
s[1] ^= state[1];
s[2] ^= state[2];
s[3] ^= state[3];
}
(*this)();
}
}
state[0] = s[0];
state[1] = s[1];
state[2] = s[2];
state[3] = s[3];
}
private:
// rotate left
constexpr std::uint64_t rol(std::uint64_t x, int k) {
return (x << k) | (x >> (64 - k));
}
std::uint64_t state[4];
UIntType state[4];
};
using xoshiro256ss = xoshiro_engine<
std::uint64_t,
starstar_scrambler<std::uint64_t, 5, 7, 9>,
17,
45>;
}

View File

@ -0,0 +1,183 @@
#pragma once
#include <memory>
#include <queue>
namespace vcpkg {
namespace detail {
template <class Action, class ThreadLocalData>
auto call_action(
Action& action,
const WorkQueue<Action, ThreadLocalData>& work_queue,
ThreadLocalData& tld
) -> decltype(static_cast<void>(std::move(action)(tld, work_queue)))
{
std::move(action)(tld, work_queue);
}
template <class Action, class ThreadLocalData>
auto call_action(
Action& action,
const WorkQueue<Action, ThreadLocalData>&,
ThreadLocalData& tld
) -> decltype(static_cast<void>(std::move(action)(tld)))
{
std::move(action)(tld);
}
}
template <class Action, class ThreadLocalData>
struct WorkQueue {
template <class F>
explicit WorkQueue(const F& initializer) noexcept {
state = State::Joining;
std::size_t num_threads = std::thread::hardware_concurrency();
if (num_threads == 0) {
num_threads = 4;
}
m_threads.reserve(num_threads);
for (std::size_t i = 0; i < num_threads; ++i) {
m_threads.emplace_back(this, initializer);
}
}
WorkQueue(WorkQueue const&) = delete;
WorkQueue(WorkQueue&&) = delete;
~WorkQueue() = default;
// runs all remaining tasks, and blocks on their finishing
// if this is called in an existing task, _will block forever_
// DO NOT DO THAT
// thread-unsafe
void join() {
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
if (m_state == State::Running) {
m_state = State::Joining;
} else if (m_state == State::Joining) {
Checks::exit_with_message(VCPKG_LINE_INFO, "Attempted to join more than once");
}
}
for (auto& thrd : m_threads) {
thrd.join();
}
}
// useful in the case of errors
// doesn't stop any existing running tasks
// returns immediately, so that one can call this in a task
void terminate() const {
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
m_state = State::Terminated;
}
m_cv.notify_all();
}
void enqueue_action(Action a) const {
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
m_actions.push_back(std::move(a));
}
m_cv.notify_one();
}
template <class Rng>
void enqueue_all_actions_by_move(Rng&& rng) const {
{
using std::begin;
using std::end;
auto lck = std::unique_lock<std::mutex>(m_mutex);
auto first = begin(rng);
auto last = end(rng);
m_actions.reserve(m_actions.size() + (end - begin));
std::move(first, last, std::back_insert_iterator(rng));
}
m_cv.notify_all();
}
template <class Rng>
void enqueue_all_actions(Rng&& rng) const {
{
using std::begin;
using std::end;
auto lck = std::unique_lock<std::mutex>(m_mutex);
auto first = begin(rng);
auto last = end(rng);
m_actions.reserve(m_actions.size() + (end - begin));
std::copy(first, last, std::back_insert_iterator(rng));
}
m_cv.notify_all();
}
private:
friend struct WorkQueueWorker {
const WorkQueue* work_queue;
ThreadLocalData tld;
template <class F>
WorkQueueWorker(const WorkQueue* work_queue, const F& initializer)
: work_queue(work_queue), tld(initializer())
{ }
void operator()() {
for (;;) {
auto lck = std::unique_lock<std::mutex>(work_queue->m_mutex);
++work_queue->running_workers;
const auto state = work_queue->m_state;
if (state == State::Terminated) {
--work_queue->running_workers;
return;
}
if (work_queue->m_actions.empty()) {
--work_queue->running_workers;
if (state == State::Running || work_queue->running_workers > 0) {
work_queue->m_cv.wait(lck);
continue;
}
// state == State::Joining and we are the only worker
// no more work!
return;
}
Action action = work_queue->m_actions.pop_back();
lck.unlock();
detail::call_action(action, *work_queue, tld);
}
}
};
enum class State : std::uint16_t {
Running,
Joining,
Terminated,
};
mutable std::mutex m_mutex;
// these four are under m_mutex
mutable State m_state;
mutable std::uint16_t running_workers;
mutable std::vector<Action> m_actions;
mutable std::condition_variable condition_variable;
std::vector<std::thread> m_threads;
};
}

View File

@ -6,6 +6,7 @@
#include <vcpkg/base/system.h>
#include <vcpkg/base/system.print.h>
#include <vcpkg/base/system.process.h>
#include <vcpkg/base/work_queue.h>
#include <vcpkg/base/util.h>
#if defined(__linux__) || defined(__APPLE__)
@ -21,6 +22,45 @@
#include <copyfile.h>
#endif
namespace fs {
file_status decltype(symlink_status)::operator()(const path& p, std::error_code& ec) const noexcept {
#if defined(_WIN32)
/*
do not find the permissions of the file -- it's unnecessary for the
things that vcpkg does.
if one were to add support for this in the future, one should look
into GetFileSecurityW
*/
perms permissions = perms::unknown;
WIN32_FILE_ATTRIBUTE_DATA file_attributes;
file_type ft = file_type::unknown;
if (!GetFileAttributesExW(p.c_str(), GetFileExInfoStandard, &file_attributes)) {
ft = file_type::not_found;
} else if (file_attributes.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
// check for reparse point -- if yes, then symlink
ft = file_type::symlink;
} else if (file_attributes.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
ft = file_type::directory;
} else {
// otherwise, the file is a regular file
ft = file_type::regular;
}
return file_status(ft, permissions);
#else
return stdfs::symlink_status(p, ec);
#endif
}
file_status decltype(symlink_status)::operator()(const path& p) const noexcept {
std::error_code ec;
auto result = symlink_status(p, ec);
if (ec) vcpkg::Checks::exit_with_message(VCPKG_LINE_INFO, "error getting status of path %s: %s", p, ec.message());
}
}
namespace vcpkg::Files
{
static const std::regex FILESYSTEM_INVALID_CHARACTERS_REGEX = std::regex(R"([\/:*?"<>|])");
@ -263,55 +303,129 @@ namespace vcpkg::Files
(as well as on macOS and Linux), this is just as fast and will have
fewer spurious errors due to locks.
*/
struct recursive {
const fs::path& tmp_directory;
std::error_code& ec;
xoshiro256ss_engine& rng;
void operator()(const fs::path& current) const {
const auto type = fs::stdfs::symlink_status(current, ec).type();
if (ec) return;
/*
`remove` doesn't actually remove anything -- it simply moves the
files into a parent directory (which ends up being at `path`),
and then inserts `actually_remove{current_path}` into the work
queue.
*/
struct remove {
struct tld {
const fs::path& tmp_directory;
std::uint64_t index;
const auto tmp_name = Strings::b64url_encode(rng());
const auto tmp_path = tmp_directory / tmp_name;
std::atomic<std::uintmax_t>& files_deleted;
switch (type) {
case fs::file_type::directory: {
fs::stdfs::rename(current, tmp_path, ec);
if (ec) return;
for (const auto& entry : fs::stdfs::directory_iterator(tmp_path)) {
(*this)(entry);
std::mutex& ec_mutex;
std::error_code& ec;
};
struct actually_remove;
using queue = WorkQueue<actually_remove, tld>;
/*
if `current_path` is a directory, first `remove`s all
elements of the directory, then calls remove.
else, just calls remove.
*/
struct actually_remove {
fs::path current_path;
void operator()(tld& info, const queue& queue) const {
std::error_code ec;
const auto path_type = fs::symlink_status(current_path, ec).type();
if (check_ec(ec, info, queue)) return;
if (path_type == fs::file_type::directory) {
for (const auto& entry : fs::stdfs::directory_iterator(current_path)) {
remove{}(entry, info, queue);
}
}
if (fs::stdfs::remove(current_path, ec)) {
info.files_deleted.fetch_add(1, std::memory_order_relaxed);
} else {
check_ec(ec, info, queue);
}
fs::stdfs::remove(tmp_path, ec);
} break;
case fs::file_type::symlink:
case fs::file_type::regular: {
fs::stdfs::rename(current, tmp_path, ec);
fs::stdfs::remove(current, ec);
} break;
case fs::file_type::not_found: return;
case fs::file_type::none: {
Checks::exit_with_message(VCPKG_LINE_INFO, "Error occurred when evaluating file type of file: %s", current);
}
default: {
Checks::exit_with_message(VCPKG_LINE_INFO, "Attempted to delete special file: %s", current);
}
};
static bool check_ec(const std::error_code& ec, tld& info, const queue& queue) {
if (ec) {
queue.terminate();
auto lck = std::unique_lock<std::mutex>(info.ec_mutex);
if (!info.ec) {
info.ec = ec;
}
return true;
} else {
return false;
}
}
void operator()(const fs::path& current_path, tld& info, const queue& queue) const {
std::error_code ec;
const auto type = fs::symlink_status(current_path, ec).type();
if (check_ec(ec, info, queue)) return;
const auto tmp_name = Strings::b64url_encode(info.index++);
const auto tmp_path = info.tmp_directory / tmp_name;
fs::stdfs::rename(current_path, tmp_path, ec);
if (check_ec(ec, info, queue)) return;
queue.enqueue_action(actually_remove{std::move(tmp_path)});
}
};
auto const real_path = fs::stdfs::absolute(path);
const auto path_type = fs::symlink_status(path, ec).type();
if (! real_path.has_parent_path()) {
Checks::exit_with_message(VCPKG_LINE_INFO, "Attempted to remove_all the base directory");
std::atomic<std::uintmax_t> files_deleted = 0;
if (path_type == fs::file_type::directory) {
std::uint64_t index = 0;
std::mutex ec_mutex;
auto queue = remove::queue([&] {
index += 1 << 32;
return remove::tld{path, index, files_deleted, ec_mutex, ec};
});
index += 1 << 32;
auto main_tld = remove::tld{path, index, files_deleted, ec_mutex, ec};
for (const auto& entry : fs::stdfs::directory_iterator(path)) {
remove{}(entry, main_tld, queue);
}
queue.join();
}
// thoughts: is this fine? or should we do something different?
// maybe a temporary directory?
auto const base_path = real_path.parent_path();
/*
we need to do backoff on the removal of the top level directory,
since we need to place all moved files into that top level
directory, and so we can only delete the directory after all the
lower levels have been deleted.
*/
for (int backoff = 0; backoff < 5; ++backoff) {
if (backoff) {
using namespace std::chrono_literals;
auto backoff_time = 100ms * backoff;
std::this_thread::sleep_for(backoff_time);
}
xoshiro256ss_engine rng{};
recursive{base_path, ec, rng}(real_path);
if (fs::stdfs::remove(path, ec)) {
files_deleted.fetch_add(1, std::memory_order_relaxed);
break;
}
}
return files_deleted;
}
virtual bool exists(const fs::path& path) const override { return fs::stdfs::exists(path); }
virtual bool is_directory(const fs::path& path) const override { return fs::stdfs::is_directory(path); }
@ -343,11 +457,11 @@ namespace vcpkg::Files
virtual fs::file_status status(const fs::path& path, std::error_code& ec) const override
{
return fs::stdfs::status(path, ec);
return fs::status(path, ec);
}
virtual fs::file_status symlink_status(const fs::path& path, std::error_code& ec) const override
{
return fs::stdfs::symlink_status(path, ec);
return fs::symlink_status(path, ec);
}
virtual void write_contents(const fs::path& file_path, const std::string& data, std::error_code& ec) override
{

View File

@ -1,11 +1,11 @@
#include <base/rng.h>
namespace vcpkg {
namespace vcpkg::Rng {
namespace {
std::random_device system_entropy{};
}
splitmix64_engine::splitmix64_engine() {
splitmix::splitmix() {
std::uint64_t top_half = system_entropy();
std::uint64_t bottom_half = system_entropy();