VolumeVerifier: Multithreading

This commit is contained in:
JosJuice 2019-08-06 11:39:09 +02:00
parent f754a1a548
commit 34fb608dd6
2 changed files with 113 additions and 39 deletions

View File

@ -6,7 +6,9 @@
#include <algorithm> #include <algorithm>
#include <cinttypes> #include <cinttypes>
#include <future>
#include <limits> #include <limits>
#include <mutex>
#include <optional> #include <optional>
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
@ -683,6 +685,34 @@ void VolumeVerifier::SetUpHashing()
} }
} }
void VolumeVerifier::WaitForAsyncOperations() const
{
if (m_crc32_future.valid())
m_crc32_future.wait();
if (m_md5_future.valid())
m_md5_future.wait();
if (m_sha1_future.valid())
m_sha1_future.wait();
if (m_content_future.valid())
m_content_future.wait();
if (m_block_future.valid())
m_block_future.wait();
}
bool VolumeVerifier::ReadChunkAndWaitForAsyncOperations(u64 bytes_to_read)
{
std::vector<u8> data(bytes_to_read);
{
std::lock_guard lk(m_volume_mutex);
if (!m_volume.Read(m_progress, bytes_to_read, data.data(), PARTITION_NONE))
return false;
}
WaitForAsyncOperations();
m_data = std::move(data);
return true;
}
void VolumeVerifier::Process() void VolumeVerifier::Process()
{ {
ASSERT(m_started); ASSERT(m_started);
@ -718,13 +748,8 @@ void VolumeVerifier::Process()
} }
bytes_to_read = std::min(bytes_to_read, m_max_progress - m_progress); bytes_to_read = std::min(bytes_to_read, m_max_progress - m_progress);
bool read_succeeded = false; const bool is_data_needed = m_calculating_any_hash || content_read || block_read;
std::vector<u8> data(bytes_to_read); const bool read_succeeded = is_data_needed && ReadChunkAndWaitForAsyncOperations(bytes_to_read);
if (m_calculating_any_hash || content_read || block_read)
{
if (m_volume.Read(m_progress, bytes_to_read, data.data(), PARTITION_NONE))
read_succeeded = true;
}
if (m_calculating_any_hash) if (m_calculating_any_hash)
{ {
@ -736,56 +761,91 @@ void VolumeVerifier::Process()
{ {
if (m_hashes_to_calculate.crc32) if (m_hashes_to_calculate.crc32)
{ {
// It would be nice to use crc32_z here instead of crc32, but it isn't available on Android m_crc32_future = std::async(std::launch::async, [this] {
// Would be nice to use crc32_z here instead of crc32, but it isn't available on Android
m_crc32_context = m_crc32_context =
crc32(m_crc32_context, data.data(), static_cast<unsigned int>(bytes_to_read)); crc32(m_crc32_context, m_data.data(), static_cast<unsigned int>(m_data.size()));
});
} }
if (m_hashes_to_calculate.md5) if (m_hashes_to_calculate.md5)
mbedtls_md5_update_ret(&m_md5_context, data.data(), bytes_to_read); {
m_md5_future = std::async(std::launch::async, [this] {
mbedtls_md5_update_ret(&m_md5_context, m_data.data(), m_data.size());
});
}
if (m_hashes_to_calculate.sha1) if (m_hashes_to_calculate.sha1)
mbedtls_sha1_update_ret(&m_sha1_context, data.data(), bytes_to_read); {
m_sha1_future = std::async(std::launch::async, [this] {
mbedtls_sha1_update_ret(&m_sha1_context, m_data.data(), m_data.size());
});
}
} }
} }
if (content_read) if (content_read)
{ {
if (!read_succeeded || !m_volume.CheckContentIntegrity(content, data, m_ticket)) m_content_future = std::async(std::launch::async, [this, read_succeeded, content] {
if (!read_succeeded || !m_volume.CheckContentIntegrity(content, m_data, m_ticket))
{ {
AddProblem( AddProblem(
Severity::High, Severity::High,
StringFromFormat(Common::GetStringT("Content %08x is corrupt.").c_str(), content.id)); StringFromFormat(Common::GetStringT("Content %08x is corrupt.").c_str(), content.id));
} }
});
m_content_index++; m_content_index++;
} }
while (m_block_index < m_blocks.size() && if (m_block_index < m_blocks.size() &&
m_blocks[m_block_index].offset < m_progress + bytes_to_read) m_blocks[m_block_index].offset < m_progress + bytes_to_read)
{ {
const bool success = m_blocks[m_block_index].offset == m_progress ? m_md5_future = std::async(
read_succeeded && m_volume.CheckBlockIntegrity( std::launch::async,
m_blocks[m_block_index].block_index, data, [this, read_succeeded, bytes_to_read](size_t block_index, u64 progress) {
m_blocks[m_block_index].partition) : while (block_index < m_blocks.size() &&
m_volume.CheckBlockIntegrity(m_blocks[m_block_index].block_index, m_blocks[block_index].offset < progress + bytes_to_read)
m_blocks[m_block_index].partition); {
bool success;
if (m_blocks[block_index].offset == progress)
{
success = read_succeeded &&
m_volume.CheckBlockIntegrity(m_blocks[block_index].block_index, m_data,
m_blocks[block_index].partition);
}
else
{
std::lock_guard lk(m_volume_mutex);
success = m_volume.CheckBlockIntegrity(m_blocks[block_index].block_index,
m_blocks[block_index].partition);
}
if (!success) if (!success)
{ {
const u64 offset = m_blocks[m_block_index].offset; const u64 offset = m_blocks[block_index].offset;
if (m_scrubber.CanBlockBeScrubbed(offset)) if (m_scrubber.CanBlockBeScrubbed(offset))
{ {
WARN_LOG(DISCIO, "Integrity check failed for unused block at 0x%" PRIx64, offset); WARN_LOG(DISCIO, "Integrity check failed for unused block at 0x%" PRIx64, offset);
m_unused_block_errors[m_blocks[m_block_index].partition]++; m_unused_block_errors[m_blocks[block_index].partition]++;
} }
else else
{ {
WARN_LOG(DISCIO, "Integrity check failed for block at 0x%" PRIx64, offset); WARN_LOG(DISCIO, "Integrity check failed for block at 0x%" PRIx64, offset);
m_block_errors[m_blocks[m_block_index].partition]++; m_block_errors[m_blocks[block_index].partition]++;
} }
} }
block_index++;
}
},
m_block_index, m_progress);
while (m_block_index < m_blocks.size() &&
m_blocks[m_block_index].offset < m_progress + bytes_to_read)
{
m_block_index++; m_block_index++;
} }
}
m_progress += bytes_to_read; m_progress += bytes_to_read;
} }
@ -806,6 +866,8 @@ void VolumeVerifier::Finish()
return; return;
m_done = true; m_done = true;
WaitForAsyncOperations();
ASSERT(m_content_index == m_content_offsets.size()); ASSERT(m_content_index == m_content_offsets.size());
ASSERT(m_block_index == m_blocks.size()); ASSERT(m_block_index == m_blocks.size());

View File

@ -4,7 +4,9 @@
#pragma once #pragma once
#include <future>
#include <map> #include <map>
#include <mutex>
#include <optional> #include <optional>
#include <string> #include <string>
#include <vector> #include <vector>
@ -98,6 +100,8 @@ private:
u64 GetBiggestUsedOffset(const FileInfo& file_info) const; u64 GetBiggestUsedOffset(const FileInfo& file_info) const;
void CheckMisc(); void CheckMisc();
void SetUpHashing(); void SetUpHashing();
void WaitForAsyncOperations() const;
bool ReadChunkAndWaitForAsyncOperations(u64 bytes_to_read);
void AddProblem(Severity severity, std::string text); void AddProblem(Severity severity, std::string text);
@ -113,6 +117,14 @@ private:
mbedtls_md5_context m_md5_context; mbedtls_md5_context m_md5_context;
mbedtls_sha1_context m_sha1_context; mbedtls_sha1_context m_sha1_context;
std::vector<u8> m_data;
std::mutex m_volume_mutex;
std::future<void> m_crc32_future;
std::future<void> m_md5_future;
std::future<void> m_sha1_future;
std::future<void> m_content_future;
std::future<void> m_block_future;
DiscScrubber m_scrubber; DiscScrubber m_scrubber;
IOS::ES::TicketReader m_ticket; IOS::ES::TicketReader m_ticket;
std::vector<u64> m_content_offsets; std::vector<u64> m_content_offsets;