Galaxy: Make --galaxy-install multithreaded

This commit is contained in:
Sude 2018-02-12 18:46:17 +02:00
parent b384dfd0c1
commit e9ac6d072f
2 changed files with 160 additions and 65 deletions

View File

@ -114,7 +114,7 @@ class Downloader
void saveChangelog(const std::string& changelog, const std::string& filepath);
static void processDownloadQueue(Config conf, const unsigned int& tid);
static int progressCallbackForThread(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
void printProgress();
template <typename T> void printProgress(const ThreadSafeQueue<T>& download_queue);
static void getGameDetailsThread(Config config, const unsigned int& tid);
static int progressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
@ -123,6 +123,7 @@ class Downloader
static size_t readData(void *ptr, size_t size, size_t nmemb, FILE *stream);
std::vector<std::string> galaxyGetOrphanedFiles(const std::vector<galaxyDepotItem>& items, const std::string& install_path);
static void processGalaxyDownloadQueue(const std::string& install_path, Config conf, const unsigned int& tid);
Website *gogWebsite;
API *gogAPI;

View File

@ -43,6 +43,7 @@ ThreadSafeQueue<Message> msgQueue;
ThreadSafeQueue<gameFile> createXMLQueue;
ThreadSafeQueue<gameItem> gameItemQueue;
ThreadSafeQueue<gameDetails> gameDetailsQueue;
ThreadSafeQueue<galaxyDepotItem> dlQueueGalaxy;
std::mutex mtx_create_directories; // Mutex for creating directories in Downloader::processDownloadQueue
static curl_off_t WriteChunkMemoryCallback(void *contents, curl_off_t size, curl_off_t nmemb, void *userp)
@ -133,7 +134,7 @@ Downloader::Downloader()
Globals::galaxyConf.setJSON(json);
} catch (const Json::Exception& exc) {
std::cerr << "Failed to parse " << Globals::galaxyConf.getFilepath() << std::endl;
std::cerr << exc.what() << std::endl;
std::cerr << exc.what() << std::endl;
}
if (ifs)
@ -794,7 +795,7 @@ void Downloader::repair()
std::cerr << "Could not parse JSON response, skipping file" << std::endl;
continue;
}
if (!downlinkJson.isMember("downlink"))
{
std::cerr << "Invalid JSON response, skipping file" << std::endl;
@ -960,7 +961,7 @@ void Downloader::download()
vThreads.push_back(std::thread(Downloader::processDownloadQueue, Globals::globalConfig, i));
}
this->printProgress();
this->printProgress(dlQueue);
// Join threads
for (unsigned int i = 0; i < vThreads.size(); ++i)
@ -2240,7 +2241,7 @@ int Downloader::loadGameDetailsCache()
} catch (const Json::Exception& exc) {
std::cout << "Failed to parse cache" << std::endl;
std::cout << exc.what() << std::endl;
return 2;
return 2;
}
if (root.isMember("date"))
@ -3009,7 +3010,7 @@ int Downloader::progressCallbackForThread(void *clientp, curl_off_t dltotal, cur
return 0;
}
void Downloader::printProgress()
template <typename T> void Downloader::printProgress(const ThreadSafeQueue<T>& download_queue)
{
// Print progress information until all threads have finished their tasks
ProgressBar bar(Globals::globalConfig.bUnicode, Globals::globalConfig.bColor);
@ -3138,7 +3139,7 @@ void Downloader::printProgress()
}
ss << "Total: " << std::setprecision(2) << std::fixed << total_rate << rate_unit << " | ";
}
ss << "Remaining: " << dlQueue.size();
ss << "Remaining: " << download_queue.size();
vProgressText.push_back(ss.str());
}
@ -3428,6 +3429,7 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
std::cout << "\tmd5: " << items[i].md5 << std::endl;
}
totalSize += items[i].totalSizeUncompressed;
dlQueueGalaxy.push(items[i]);
}
double totalSizeMB = static_cast<double>(totalSize)/1024/1024;
@ -3435,17 +3437,98 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
std::cout << "Files: " << items.size() << std::endl;
std::cout << "Total size installed: " << totalSizeMB << " MB" << std::endl;
for (unsigned int i = 0; i < items.size(); ++i)
// Limit thread count to number of items in download queue
unsigned int iThreads = std::min(Globals::globalConfig.iThreads, static_cast<unsigned int>(dlQueueGalaxy.size()));
// Create download threads
std::vector<std::thread> vThreads;
for (unsigned int i = 0; i < iThreads; ++i)
{
boost::filesystem::path path = install_path + "/" + items[i].path;
DownloadInfo dlInfo;
dlInfo.setStatus(DLSTATUS_NOTSTARTED);
vDownloadInfo.push_back(dlInfo);
vThreads.push_back(std::thread(Downloader::processGalaxyDownloadQueue, install_path, Globals::globalConfig, i));
}
this->printProgress(dlQueueGalaxy);
// Join threads
for (unsigned int i = 0; i < vThreads.size(); ++i)
vThreads[i].join();
vThreads.clear();
vDownloadInfo.clear();
std::cout << "Checking for orphaned files" << std::endl;
std::vector<std::string> orphans = this->galaxyGetOrphanedFiles(items, install_path);
std::cout << "\t" << orphans.size() << " orphaned files" << std::endl;
for (unsigned int i = 0; i < orphans.size(); ++i)
std::cout << "\t" << orphans[i] << std::endl;
}
void Downloader::processGalaxyDownloadQueue(const std::string& install_path, Config conf, const unsigned int& tid)
{
std::string msg_prefix = "[Thread #" + std::to_string(tid) + "]";
galaxyAPI* galaxy = new galaxyAPI(Globals::globalConfig.curlConf);
if (!galaxy->init())
{
if (!galaxy->refreshLogin())
{
delete galaxy;
msgQueue.push(Message("Galaxy API failed to refresh login", MSGTYPE_ERROR, msg_prefix));
vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED);
return;
}
}
CURL* dlhandle = curl_easy_init();
curl_easy_setopt(dlhandle, CURLOPT_FOLLOWLOCATION, 1);
curl_easy_setopt(dlhandle, CURLOPT_USERAGENT, conf.curlConf.sUserAgent.c_str());
curl_easy_setopt(dlhandle, CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(dlhandle, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(dlhandle, CURLOPT_CONNECTTIMEOUT, conf.curlConf.iTimeout);
curl_easy_setopt(dlhandle, CURLOPT_FAILONERROR, true);
curl_easy_setopt(dlhandle, CURLOPT_SSL_VERIFYPEER, conf.curlConf.bVerifyPeer);
curl_easy_setopt(dlhandle, CURLOPT_VERBOSE, conf.curlConf.bVerbose);
curl_easy_setopt(dlhandle, CURLOPT_WRITEFUNCTION, Downloader::writeData);
curl_easy_setopt(dlhandle, CURLOPT_READFUNCTION, Downloader::readData);
curl_easy_setopt(dlhandle, CURLOPT_MAX_RECV_SPEED_LARGE, conf.curlConf.iDownloadRate);
curl_easy_setopt(dlhandle, CURLOPT_FILETIME, 1L);
// Assume that we have connection error and abort transfer with CURLE_OPERATION_TIMEDOUT if download speed is less than 200 B/s for 30 seconds
curl_easy_setopt(dlhandle, CURLOPT_LOW_SPEED_TIME, conf.curlConf.iLowSpeedTimeout);
curl_easy_setopt(dlhandle, CURLOPT_LOW_SPEED_LIMIT, conf.curlConf.iLowSpeedTimeoutRate);
if (!conf.curlConf.sCACertPath.empty())
curl_easy_setopt(dlhandle, CURLOPT_CAINFO, conf.curlConf.sCACertPath.c_str());
xferInfo xferinfo;
xferinfo.tid = tid;
xferinfo.curlhandle = dlhandle;
curl_easy_setopt(dlhandle, CURLOPT_XFERINFOFUNCTION, Downloader::progressCallbackForThread);
curl_easy_setopt(dlhandle, CURLOPT_XFERINFODATA, &xferinfo);
galaxyDepotItem item;
while (dlQueueGalaxy.try_pop(item))
{
vDownloadInfo[tid].setStatus(DLSTATUS_STARTING);
boost::filesystem::path path = install_path + "/" + item.path;
// Check that directory exists and create it
boost::filesystem::path directory = path.parent_path();
mtx_create_directories.lock(); // Use mutex to avoid possible race conditions
if (boost::filesystem::exists(directory))
{
if (!boost::filesystem::is_directory(directory))
{
std::cerr << directory << " is not directory" << std::endl;
msgQueue.push(Message(directory.string() + " is not directory", MSGTYPE_ERROR, msg_prefix));
vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED);
delete galaxy;
mtx_create_directories.unlock();
return;
}
}
@ -3453,52 +3536,59 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
{
if (!boost::filesystem::create_directories(directory))
{
std::cerr << "Failed to create directory: " << directory << std::endl;
msgQueue.push(Message("Failed to create directory: " + directory.string(), MSGTYPE_ERROR, msg_prefix));
vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED);
delete galaxy;
mtx_create_directories.unlock();
return;
}
}
mtx_create_directories.unlock();
vDownloadInfo[tid].setFilename(path.string());
unsigned int start_chunk = 0;
if (boost::filesystem::exists(path))
{
std::cout << "File already exists: " << path.string() << std::endl;
if (conf.bVerbose)
msgQueue.push(Message("File already exists: " + path.string(), MSGTYPE_INFO, msg_prefix));
unsigned int resume_chunk = 0;
uintmax_t filesize = boost::filesystem::file_size(path);
if (filesize == items[i].totalSizeUncompressed)
if (filesize == item.totalSizeUncompressed)
{
// File is same size
if (Util::getFileHash(path.string(), RHASH_MD5) == items[i].md5)
if (Util::getFileHash(path.string(), RHASH_MD5) == item.md5)
{
std::cout << "\tOK" << std::endl;
msgQueue.push(Message(path.string() + ": OK", MSGTYPE_SUCCESS, msg_prefix));
continue;
}
else
{
std::cout << "\tMD5 mismatch" << std::endl;
msgQueue.push(Message(path.string() + ": MD5 mismatch", MSGTYPE_WARNING, msg_prefix));
if (!boost::filesystem::remove(path))
{
std::cerr << "\tFailed to delete " << path << std::endl;
msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix));
continue;
}
}
}
else if (filesize > items[i].totalSizeUncompressed)
else if (filesize > item.totalSizeUncompressed)
{
// File is bigger than on server, delete old file and start from beginning
std::cout << "\tFile is bigger than expected. Deleting old file and starting from beginning." << std::endl;
msgQueue.push(Message(path.string() + ": File is bigger than expected. Deleting old file and starting from beginning", MSGTYPE_INFO, msg_prefix));
if (!boost::filesystem::remove(path))
{
std::cerr << "\tFailed to delete " << path << std::endl;
msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix));
continue;
}
}
else
{
// File is smaller than on server, resume
for (unsigned int j = 0; j < items[i].chunks.size(); ++j)
for (unsigned int j = 0; j < item.chunks.size(); ++j)
{
if (items[i].chunks[j].offset_uncompressed == filesize)
if (item.chunks[j].offset_uncompressed == filesize)
{
resume_chunk = j;
break;
@ -3507,23 +3597,23 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
if (resume_chunk > 0)
{
std::cout << "\tResume from chunk " << resume_chunk << std::endl;
msgQueue.push(Message(path.string() + ": Resume from chunk " + std::to_string(resume_chunk), MSGTYPE_INFO, msg_prefix));
// Get chunk hash for previous chunk
FILE* f = fopen(path.string().c_str(), "r");
if (!f)
{
std::cerr << "\tFailed to open: " << path << std::endl;
msgQueue.push(Message(path.string() + ": Failed to open", MSGTYPE_ERROR, msg_prefix));
continue;
}
unsigned int previous_chunk = resume_chunk - 1;
uintmax_t chunk_size = items[i].chunks[previous_chunk].size_uncompressed;
uintmax_t chunk_size = item.chunks[previous_chunk].size_uncompressed;
// use fseeko to support large files on 32 bit platforms
fseeko(f, items[i].chunks[previous_chunk].offset_uncompressed, SEEK_SET);
fseeko(f, item.chunks[previous_chunk].offset_uncompressed, SEEK_SET);
unsigned char *chunk = (unsigned char *) malloc(chunk_size * sizeof(unsigned char *));
if (chunk == NULL)
{
std::cerr << "Memory error" << std::endl;
msgQueue.push(Message(path.string() + ": Memory error - Chunk " + std::to_string(resume_chunk), MSGTYPE_ERROR, msg_prefix));
fclose(f);
continue;
}
@ -3533,14 +3623,14 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
if (fread_size != chunk_size)
{
std::cerr << "Read error" << std::endl;
msgQueue.push(Message(path.string() + ": Read error - Chunk " + std::to_string(resume_chunk), MSGTYPE_ERROR, msg_prefix));
free(chunk);
continue;
}
std::string chunk_hash = Util::getChunkHash(chunk, chunk_size, RHASH_MD5);
free(chunk);
if (chunk_hash == items[i].chunks[previous_chunk].md5_uncompressed)
if (chunk_hash == item.chunks[previous_chunk].md5_uncompressed)
{
// Hash for previous chunk matches, resume at this position
start_chunk = resume_chunk;
@ -3548,20 +3638,20 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
else
{
// Hash for previous chunk is different, delete old file and start from beginning
std::cout << "\tChunk hash is different. Deleting old file and starting from beginning." << std::endl;
msgQueue.push(Message(path.string() + ": Chunk hash is different. Deleting old file and starting from beginning.", MSGTYPE_WARNING, msg_prefix));
if (!boost::filesystem::remove(path))
{
std::cerr << "\tFailed to delete " << path << std::endl;
msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix));
continue;
}
}
}
else
{
std::cout << "\tFailed to find valid resume position. Deleting old file and starting from beginning." << std::endl;
msgQueue.push(Message(path.string() + ": Failed to find valid resume position. Deleting old file and starting from beginning.", MSGTYPE_WARNING, msg_prefix));
if (!boost::filesystem::remove(path))
{
std::cerr << "\tFailed to delete " << path << std::endl;
msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix));
continue;
}
}
@ -3569,24 +3659,26 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
}
std::time_t timestamp = -1;
for (unsigned int j = start_chunk; j < items[i].chunks.size(); ++j)
for (unsigned int j = start_chunk; j < item.chunks.size(); ++j)
{
ChunkMemoryStruct chunk;
chunk.memory = (char *) malloc(1);
chunk.size = 0;
// Refresh Galaxy login if token is expired
if (gogGalaxy->isTokenExpired())
if (galaxy->isTokenExpired())
{
if (!gogGalaxy->refreshLogin())
if (!galaxy->refreshLogin())
{
std::cerr << "Galaxy API failed to refresh login" << std::endl;
msgQueue.push(Message("Galaxy API failed to refresh login", MSGTYPE_ERROR, msg_prefix));
vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED);
free(chunk.memory);
delete galaxy;
return;
}
}
json = gogGalaxy->getSecureLink(items[i].product_id, gogGalaxy->hashToGalaxyPath(items[i].chunks[j].md5_compressed));
Json::Value json = galaxy->getSecureLink(item.product_id, galaxy->hashToGalaxyPath(item.chunks[j].md5_compressed));
// Prefer edgecast urls
bool bPreferEdgecast = true;
@ -3614,51 +3706,51 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
while(Util::replaceString(url, "{path}", link_path));
while(Util::replaceString(url, "{token}", link_token));
curl_easy_setopt(curlhandle, CURLOPT_URL, url.c_str());
curl_easy_setopt(curlhandle, CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(curlhandle, CURLOPT_WRITEFUNCTION, WriteChunkMemoryCallback);
curl_easy_setopt(curlhandle, CURLOPT_WRITEDATA, &chunk);
curl_easy_setopt(curlhandle, CURLOPT_XFERINFOFUNCTION, Downloader::progressCallback);
curl_easy_setopt(curlhandle, CURLOPT_XFERINFODATA, this);
curl_easy_setopt(curlhandle, CURLOPT_FILETIME, 1L);
std::cout << path.string() << " (chunk " << (j + 1) << "/" << items[i].chunks.size() << ")" << std::endl;
curl_easy_setopt(dlhandle, CURLOPT_URL, url.c_str());
curl_easy_setopt(dlhandle, CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(dlhandle, CURLOPT_WRITEFUNCTION, WriteChunkMemoryCallback);
curl_easy_setopt(dlhandle, CURLOPT_WRITEDATA, &chunk);
curl_easy_setopt(dlhandle, CURLOPT_XFERINFOFUNCTION, Downloader::progressCallbackForThread);
curl_easy_setopt(dlhandle, CURLOPT_XFERINFODATA, &xferinfo);
curl_easy_setopt(dlhandle, CURLOPT_FILETIME, 1L);
std::string filepath_and_chunk = path.string() + " (chunk " + std::to_string(j + 1) + "/" + std::to_string(item.chunks.size()) + ")";
vDownloadInfo[tid].setFilename(filepath_and_chunk);
if (Globals::globalConfig.iWait > 0)
usleep(Globals::globalConfig.iWait); // Delay the request by specified time
this->TimeAndSize.clear();
this->timer.reset();
CURLcode result = curl_easy_perform(curlhandle);
xferinfo.offset = 0;
xferinfo.timer.reset();
xferinfo.TimeAndSize.clear();
curl_easy_setopt(curlhandle, CURLOPT_WRITEFUNCTION, Downloader::writeData);
curl_easy_setopt(curlhandle, CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(curlhandle, CURLOPT_FILETIME, 0L);
CURLcode result = curl_easy_perform(dlhandle);
curl_easy_setopt(dlhandle, CURLOPT_WRITEFUNCTION, Downloader::writeData);
curl_easy_setopt(dlhandle, CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(dlhandle, CURLOPT_FILETIME, 0L);
if (result != CURLE_OK)
{
std::cout << "\033[K" << curl_easy_strerror(result) << std::endl;
msgQueue.push(Message(std::string(curl_easy_strerror(result)), MSGTYPE_ERROR, msg_prefix));
if (result == CURLE_HTTP_RETURNED_ERROR)
{
long int response_code = 0;
result = curl_easy_getinfo(curlhandle, CURLINFO_RESPONSE_CODE, &response_code);
std::cout << "HTTP ERROR: ";
result = curl_easy_getinfo(dlhandle, CURLINFO_RESPONSE_CODE, &response_code);
if (result == CURLE_OK)
std::cout << response_code << " (" << url << ")" << std::endl;
msgQueue.push(Message("HTTP ERROR: " + std::to_string(response_code) + " (" + url + ")", MSGTYPE_ERROR, msg_prefix));
else
std::cout << "failed to get error code: " << curl_easy_strerror(result) << " (" << url << ")" << std::endl;
msgQueue.push(Message("HTTP ERROR: failed to get error code: " + std::string(curl_easy_strerror(result)) + " (" + url + ")", MSGTYPE_ERROR, msg_prefix));
}
}
else
{
// Get timestamp for downloaded file
long filetime = -1;
result = curl_easy_getinfo(curlhandle, CURLINFO_FILETIME, &filetime);
result = curl_easy_getinfo(dlhandle, CURLINFO_FILETIME, &filetime);
if (result == CURLE_OK && filetime >= 0)
timestamp = (std::time_t)filetime;
}
std::cout << std::endl;
std::ofstream ofs(path.string(), std::ofstream::out | std::ofstream::binary | std::ofstream::app);
if (ofs)
@ -3677,13 +3769,15 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde
// Set timestamp for downloaded file to same value as file on server
if (boost::filesystem::exists(path) && timestamp >= 0)
boost::filesystem::last_write_time(path, timestamp);
msgQueue.push(Message("Download complete: " + path.string(), MSGTYPE_SUCCESS, msg_prefix));
}
std::cout << "Checking for orphaned files" << std::endl;
std::vector<std::string> orphans = this->galaxyGetOrphanedFiles(items, install_path);
std::cout << "\t" << orphans.size() << " orphaned files" << std::endl;
for (unsigned int i = 0; i < orphans.size(); ++i)
std::cout << "\t" << orphans[i] << std::endl;
vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED);
delete galaxy;
curl_easy_cleanup(dlhandle);
return;
}
void Downloader::galaxyShowBuilds(const std::string& product_id, int build_index)