From e9ac6d072f495b4b2ec202b22fed0b9f90892e99 Mon Sep 17 00:00:00 2001 From: Sude Date: Mon, 12 Feb 2018 18:46:17 +0200 Subject: [PATCH] Galaxy: Make --galaxy-install multithreaded --- include/downloader.h | 3 +- src/downloader.cpp | 222 ++++++++++++++++++++++++++++++------------- 2 files changed, 160 insertions(+), 65 deletions(-) diff --git a/include/downloader.h b/include/downloader.h index f760bec..c18e8bf 100644 --- a/include/downloader.h +++ b/include/downloader.h @@ -114,7 +114,7 @@ class Downloader void saveChangelog(const std::string& changelog, const std::string& filepath); static void processDownloadQueue(Config conf, const unsigned int& tid); static int progressCallbackForThread(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow); - void printProgress(); + template void printProgress(const ThreadSafeQueue& download_queue); static void getGameDetailsThread(Config config, const unsigned int& tid); static int progressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow); @@ -123,6 +123,7 @@ class Downloader static size_t readData(void *ptr, size_t size, size_t nmemb, FILE *stream); std::vector galaxyGetOrphanedFiles(const std::vector& items, const std::string& install_path); + static void processGalaxyDownloadQueue(const std::string& install_path, Config conf, const unsigned int& tid); Website *gogWebsite; API *gogAPI; diff --git a/src/downloader.cpp b/src/downloader.cpp index cd8f5eb..fee9208 100644 --- a/src/downloader.cpp +++ b/src/downloader.cpp @@ -43,6 +43,7 @@ ThreadSafeQueue msgQueue; ThreadSafeQueue createXMLQueue; ThreadSafeQueue gameItemQueue; ThreadSafeQueue gameDetailsQueue; +ThreadSafeQueue dlQueueGalaxy; std::mutex mtx_create_directories; // Mutex for creating directories in Downloader::processDownloadQueue static curl_off_t WriteChunkMemoryCallback(void *contents, curl_off_t size, curl_off_t nmemb, void *userp) @@ -133,7 +134,7 @@ Downloader::Downloader() Globals::galaxyConf.setJSON(json); } catch (const Json::Exception& exc) { std::cerr << "Failed to parse " << Globals::galaxyConf.getFilepath() << std::endl; - std::cerr << exc.what() << std::endl; + std::cerr << exc.what() << std::endl; } if (ifs) @@ -794,7 +795,7 @@ void Downloader::repair() std::cerr << "Could not parse JSON response, skipping file" << std::endl; continue; } - + if (!downlinkJson.isMember("downlink")) { std::cerr << "Invalid JSON response, skipping file" << std::endl; @@ -960,7 +961,7 @@ void Downloader::download() vThreads.push_back(std::thread(Downloader::processDownloadQueue, Globals::globalConfig, i)); } - this->printProgress(); + this->printProgress(dlQueue); // Join threads for (unsigned int i = 0; i < vThreads.size(); ++i) @@ -2240,7 +2241,7 @@ int Downloader::loadGameDetailsCache() } catch (const Json::Exception& exc) { std::cout << "Failed to parse cache" << std::endl; std::cout << exc.what() << std::endl; - return 2; + return 2; } if (root.isMember("date")) @@ -3009,7 +3010,7 @@ int Downloader::progressCallbackForThread(void *clientp, curl_off_t dltotal, cur return 0; } -void Downloader::printProgress() +template void Downloader::printProgress(const ThreadSafeQueue& download_queue) { // Print progress information until all threads have finished their tasks ProgressBar bar(Globals::globalConfig.bUnicode, Globals::globalConfig.bColor); @@ -3138,7 +3139,7 @@ void Downloader::printProgress() } ss << "Total: " << std::setprecision(2) << std::fixed << total_rate << rate_unit << " | "; } - ss << "Remaining: " << dlQueue.size(); + ss << "Remaining: " << download_queue.size(); vProgressText.push_back(ss.str()); } @@ -3428,6 +3429,7 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde std::cout << "\tmd5: " << items[i].md5 << std::endl; } totalSize += items[i].totalSizeUncompressed; + dlQueueGalaxy.push(items[i]); } double totalSizeMB = static_cast(totalSize)/1024/1024; @@ -3435,17 +3437,98 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde std::cout << "Files: " << items.size() << std::endl; std::cout << "Total size installed: " << totalSizeMB << " MB" << std::endl; - for (unsigned int i = 0; i < items.size(); ++i) + // Limit thread count to number of items in download queue + unsigned int iThreads = std::min(Globals::globalConfig.iThreads, static_cast(dlQueueGalaxy.size())); + + // Create download threads + std::vector vThreads; + for (unsigned int i = 0; i < iThreads; ++i) { - boost::filesystem::path path = install_path + "/" + items[i].path; + DownloadInfo dlInfo; + dlInfo.setStatus(DLSTATUS_NOTSTARTED); + vDownloadInfo.push_back(dlInfo); + vThreads.push_back(std::thread(Downloader::processGalaxyDownloadQueue, install_path, Globals::globalConfig, i)); + } + + this->printProgress(dlQueueGalaxy); + + // Join threads + for (unsigned int i = 0; i < vThreads.size(); ++i) + vThreads[i].join(); + + vThreads.clear(); + vDownloadInfo.clear(); + + std::cout << "Checking for orphaned files" << std::endl; + std::vector orphans = this->galaxyGetOrphanedFiles(items, install_path); + std::cout << "\t" << orphans.size() << " orphaned files" << std::endl; + for (unsigned int i = 0; i < orphans.size(); ++i) + std::cout << "\t" << orphans[i] << std::endl; +} + +void Downloader::processGalaxyDownloadQueue(const std::string& install_path, Config conf, const unsigned int& tid) +{ + std::string msg_prefix = "[Thread #" + std::to_string(tid) + "]"; + + galaxyAPI* galaxy = new galaxyAPI(Globals::globalConfig.curlConf); + if (!galaxy->init()) + { + if (!galaxy->refreshLogin()) + { + delete galaxy; + msgQueue.push(Message("Galaxy API failed to refresh login", MSGTYPE_ERROR, msg_prefix)); + vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED); + return; + } + } + + CURL* dlhandle = curl_easy_init(); + curl_easy_setopt(dlhandle, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(dlhandle, CURLOPT_USERAGENT, conf.curlConf.sUserAgent.c_str()); + curl_easy_setopt(dlhandle, CURLOPT_NOPROGRESS, 0); + curl_easy_setopt(dlhandle, CURLOPT_NOSIGNAL, 1); + + curl_easy_setopt(dlhandle, CURLOPT_CONNECTTIMEOUT, conf.curlConf.iTimeout); + curl_easy_setopt(dlhandle, CURLOPT_FAILONERROR, true); + curl_easy_setopt(dlhandle, CURLOPT_SSL_VERIFYPEER, conf.curlConf.bVerifyPeer); + curl_easy_setopt(dlhandle, CURLOPT_VERBOSE, conf.curlConf.bVerbose); + curl_easy_setopt(dlhandle, CURLOPT_WRITEFUNCTION, Downloader::writeData); + curl_easy_setopt(dlhandle, CURLOPT_READFUNCTION, Downloader::readData); + curl_easy_setopt(dlhandle, CURLOPT_MAX_RECV_SPEED_LARGE, conf.curlConf.iDownloadRate); + curl_easy_setopt(dlhandle, CURLOPT_FILETIME, 1L); + + // Assume that we have connection error and abort transfer with CURLE_OPERATION_TIMEDOUT if download speed is less than 200 B/s for 30 seconds + curl_easy_setopt(dlhandle, CURLOPT_LOW_SPEED_TIME, conf.curlConf.iLowSpeedTimeout); + curl_easy_setopt(dlhandle, CURLOPT_LOW_SPEED_LIMIT, conf.curlConf.iLowSpeedTimeoutRate); + + if (!conf.curlConf.sCACertPath.empty()) + curl_easy_setopt(dlhandle, CURLOPT_CAINFO, conf.curlConf.sCACertPath.c_str()); + + xferInfo xferinfo; + xferinfo.tid = tid; + xferinfo.curlhandle = dlhandle; + + curl_easy_setopt(dlhandle, CURLOPT_XFERINFOFUNCTION, Downloader::progressCallbackForThread); + curl_easy_setopt(dlhandle, CURLOPT_XFERINFODATA, &xferinfo); + + galaxyDepotItem item; + while (dlQueueGalaxy.try_pop(item)) + { + vDownloadInfo[tid].setStatus(DLSTATUS_STARTING); + + boost::filesystem::path path = install_path + "/" + item.path; // Check that directory exists and create it boost::filesystem::path directory = path.parent_path(); + mtx_create_directories.lock(); // Use mutex to avoid possible race conditions if (boost::filesystem::exists(directory)) { if (!boost::filesystem::is_directory(directory)) { - std::cerr << directory << " is not directory" << std::endl; + msgQueue.push(Message(directory.string() + " is not directory", MSGTYPE_ERROR, msg_prefix)); + vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED); + delete galaxy; + mtx_create_directories.unlock(); return; } } @@ -3453,52 +3536,59 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde { if (!boost::filesystem::create_directories(directory)) { - std::cerr << "Failed to create directory: " << directory << std::endl; + msgQueue.push(Message("Failed to create directory: " + directory.string(), MSGTYPE_ERROR, msg_prefix)); + vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED); + delete galaxy; + mtx_create_directories.unlock(); return; } } + mtx_create_directories.unlock(); + + vDownloadInfo[tid].setFilename(path.string()); unsigned int start_chunk = 0; if (boost::filesystem::exists(path)) { - std::cout << "File already exists: " << path.string() << std::endl; + if (conf.bVerbose) + msgQueue.push(Message("File already exists: " + path.string(), MSGTYPE_INFO, msg_prefix)); unsigned int resume_chunk = 0; uintmax_t filesize = boost::filesystem::file_size(path); - if (filesize == items[i].totalSizeUncompressed) + if (filesize == item.totalSizeUncompressed) { // File is same size - if (Util::getFileHash(path.string(), RHASH_MD5) == items[i].md5) + if (Util::getFileHash(path.string(), RHASH_MD5) == item.md5) { - std::cout << "\tOK" << std::endl; + msgQueue.push(Message(path.string() + ": OK", MSGTYPE_SUCCESS, msg_prefix)); continue; } else { - std::cout << "\tMD5 mismatch" << std::endl; + msgQueue.push(Message(path.string() + ": MD5 mismatch", MSGTYPE_WARNING, msg_prefix)); if (!boost::filesystem::remove(path)) { - std::cerr << "\tFailed to delete " << path << std::endl; + msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix)); continue; } } } - else if (filesize > items[i].totalSizeUncompressed) + else if (filesize > item.totalSizeUncompressed) { // File is bigger than on server, delete old file and start from beginning - std::cout << "\tFile is bigger than expected. Deleting old file and starting from beginning." << std::endl; + msgQueue.push(Message(path.string() + ": File is bigger than expected. Deleting old file and starting from beginning", MSGTYPE_INFO, msg_prefix)); if (!boost::filesystem::remove(path)) { - std::cerr << "\tFailed to delete " << path << std::endl; + msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix)); continue; } } else { // File is smaller than on server, resume - for (unsigned int j = 0; j < items[i].chunks.size(); ++j) + for (unsigned int j = 0; j < item.chunks.size(); ++j) { - if (items[i].chunks[j].offset_uncompressed == filesize) + if (item.chunks[j].offset_uncompressed == filesize) { resume_chunk = j; break; @@ -3507,23 +3597,23 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde if (resume_chunk > 0) { - std::cout << "\tResume from chunk " << resume_chunk << std::endl; + msgQueue.push(Message(path.string() + ": Resume from chunk " + std::to_string(resume_chunk), MSGTYPE_INFO, msg_prefix)); // Get chunk hash for previous chunk FILE* f = fopen(path.string().c_str(), "r"); if (!f) { - std::cerr << "\tFailed to open: " << path << std::endl; + msgQueue.push(Message(path.string() + ": Failed to open", MSGTYPE_ERROR, msg_prefix)); continue; } unsigned int previous_chunk = resume_chunk - 1; - uintmax_t chunk_size = items[i].chunks[previous_chunk].size_uncompressed; + uintmax_t chunk_size = item.chunks[previous_chunk].size_uncompressed; // use fseeko to support large files on 32 bit platforms - fseeko(f, items[i].chunks[previous_chunk].offset_uncompressed, SEEK_SET); + fseeko(f, item.chunks[previous_chunk].offset_uncompressed, SEEK_SET); unsigned char *chunk = (unsigned char *) malloc(chunk_size * sizeof(unsigned char *)); if (chunk == NULL) { - std::cerr << "Memory error" << std::endl; + msgQueue.push(Message(path.string() + ": Memory error - Chunk " + std::to_string(resume_chunk), MSGTYPE_ERROR, msg_prefix)); fclose(f); continue; } @@ -3533,14 +3623,14 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde if (fread_size != chunk_size) { - std::cerr << "Read error" << std::endl; + msgQueue.push(Message(path.string() + ": Read error - Chunk " + std::to_string(resume_chunk), MSGTYPE_ERROR, msg_prefix)); free(chunk); continue; } std::string chunk_hash = Util::getChunkHash(chunk, chunk_size, RHASH_MD5); free(chunk); - if (chunk_hash == items[i].chunks[previous_chunk].md5_uncompressed) + if (chunk_hash == item.chunks[previous_chunk].md5_uncompressed) { // Hash for previous chunk matches, resume at this position start_chunk = resume_chunk; @@ -3548,20 +3638,20 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde else { // Hash for previous chunk is different, delete old file and start from beginning - std::cout << "\tChunk hash is different. Deleting old file and starting from beginning." << std::endl; + msgQueue.push(Message(path.string() + ": Chunk hash is different. Deleting old file and starting from beginning.", MSGTYPE_WARNING, msg_prefix)); if (!boost::filesystem::remove(path)) { - std::cerr << "\tFailed to delete " << path << std::endl; + msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix)); continue; } } } else { - std::cout << "\tFailed to find valid resume position. Deleting old file and starting from beginning." << std::endl; + msgQueue.push(Message(path.string() + ": Failed to find valid resume position. Deleting old file and starting from beginning.", MSGTYPE_WARNING, msg_prefix)); if (!boost::filesystem::remove(path)) { - std::cerr << "\tFailed to delete " << path << std::endl; + msgQueue.push(Message(path.string() + ": Failed to delete", MSGTYPE_ERROR, msg_prefix)); continue; } } @@ -3569,24 +3659,26 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde } std::time_t timestamp = -1; - for (unsigned int j = start_chunk; j < items[i].chunks.size(); ++j) + for (unsigned int j = start_chunk; j < item.chunks.size(); ++j) { ChunkMemoryStruct chunk; chunk.memory = (char *) malloc(1); chunk.size = 0; // Refresh Galaxy login if token is expired - if (gogGalaxy->isTokenExpired()) + if (galaxy->isTokenExpired()) { - if (!gogGalaxy->refreshLogin()) + if (!galaxy->refreshLogin()) { - std::cerr << "Galaxy API failed to refresh login" << std::endl; + msgQueue.push(Message("Galaxy API failed to refresh login", MSGTYPE_ERROR, msg_prefix)); + vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED); free(chunk.memory); + delete galaxy; return; } } - json = gogGalaxy->getSecureLink(items[i].product_id, gogGalaxy->hashToGalaxyPath(items[i].chunks[j].md5_compressed)); + Json::Value json = galaxy->getSecureLink(item.product_id, galaxy->hashToGalaxyPath(item.chunks[j].md5_compressed)); // Prefer edgecast urls bool bPreferEdgecast = true; @@ -3614,51 +3706,51 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde while(Util::replaceString(url, "{path}", link_path)); while(Util::replaceString(url, "{token}", link_token)); - curl_easy_setopt(curlhandle, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curlhandle, CURLOPT_NOPROGRESS, 0); - curl_easy_setopt(curlhandle, CURLOPT_WRITEFUNCTION, WriteChunkMemoryCallback); - curl_easy_setopt(curlhandle, CURLOPT_WRITEDATA, &chunk); - curl_easy_setopt(curlhandle, CURLOPT_XFERINFOFUNCTION, Downloader::progressCallback); - curl_easy_setopt(curlhandle, CURLOPT_XFERINFODATA, this); - curl_easy_setopt(curlhandle, CURLOPT_FILETIME, 1L); - - std::cout << path.string() << " (chunk " << (j + 1) << "/" << items[i].chunks.size() << ")" << std::endl; + curl_easy_setopt(dlhandle, CURLOPT_URL, url.c_str()); + curl_easy_setopt(dlhandle, CURLOPT_NOPROGRESS, 0); + curl_easy_setopt(dlhandle, CURLOPT_WRITEFUNCTION, WriteChunkMemoryCallback); + curl_easy_setopt(dlhandle, CURLOPT_WRITEDATA, &chunk); + curl_easy_setopt(dlhandle, CURLOPT_XFERINFOFUNCTION, Downloader::progressCallbackForThread); + curl_easy_setopt(dlhandle, CURLOPT_XFERINFODATA, &xferinfo); + curl_easy_setopt(dlhandle, CURLOPT_FILETIME, 1L); + std::string filepath_and_chunk = path.string() + " (chunk " + std::to_string(j + 1) + "/" + std::to_string(item.chunks.size()) + ")"; + vDownloadInfo[tid].setFilename(filepath_and_chunk); if (Globals::globalConfig.iWait > 0) usleep(Globals::globalConfig.iWait); // Delay the request by specified time - this->TimeAndSize.clear(); - this->timer.reset(); - CURLcode result = curl_easy_perform(curlhandle); + xferinfo.offset = 0; + xferinfo.timer.reset(); + xferinfo.TimeAndSize.clear(); - curl_easy_setopt(curlhandle, CURLOPT_WRITEFUNCTION, Downloader::writeData); - curl_easy_setopt(curlhandle, CURLOPT_NOPROGRESS, 0); - curl_easy_setopt(curlhandle, CURLOPT_FILETIME, 0L); + CURLcode result = curl_easy_perform(dlhandle); + + curl_easy_setopt(dlhandle, CURLOPT_WRITEFUNCTION, Downloader::writeData); + curl_easy_setopt(dlhandle, CURLOPT_NOPROGRESS, 0); + curl_easy_setopt(dlhandle, CURLOPT_FILETIME, 0L); if (result != CURLE_OK) { - std::cout << "\033[K" << curl_easy_strerror(result) << std::endl; + msgQueue.push(Message(std::string(curl_easy_strerror(result)), MSGTYPE_ERROR, msg_prefix)); if (result == CURLE_HTTP_RETURNED_ERROR) { long int response_code = 0; - result = curl_easy_getinfo(curlhandle, CURLINFO_RESPONSE_CODE, &response_code); - std::cout << "HTTP ERROR: "; + result = curl_easy_getinfo(dlhandle, CURLINFO_RESPONSE_CODE, &response_code); if (result == CURLE_OK) - std::cout << response_code << " (" << url << ")" << std::endl; + msgQueue.push(Message("HTTP ERROR: " + std::to_string(response_code) + " (" + url + ")", MSGTYPE_ERROR, msg_prefix)); else - std::cout << "failed to get error code: " << curl_easy_strerror(result) << " (" << url << ")" << std::endl; + msgQueue.push(Message("HTTP ERROR: failed to get error code: " + std::string(curl_easy_strerror(result)) + " (" + url + ")", MSGTYPE_ERROR, msg_prefix)); } } else { // Get timestamp for downloaded file long filetime = -1; - result = curl_easy_getinfo(curlhandle, CURLINFO_FILETIME, &filetime); + result = curl_easy_getinfo(dlhandle, CURLINFO_FILETIME, &filetime); if (result == CURLE_OK && filetime >= 0) timestamp = (std::time_t)filetime; } - std::cout << std::endl; std::ofstream ofs(path.string(), std::ofstream::out | std::ofstream::binary | std::ofstream::app); if (ofs) @@ -3677,13 +3769,15 @@ void Downloader::galaxyInstallGame(const std::string& product_id, int build_inde // Set timestamp for downloaded file to same value as file on server if (boost::filesystem::exists(path) && timestamp >= 0) boost::filesystem::last_write_time(path, timestamp); + + msgQueue.push(Message("Download complete: " + path.string(), MSGTYPE_SUCCESS, msg_prefix)); } - std::cout << "Checking for orphaned files" << std::endl; - std::vector orphans = this->galaxyGetOrphanedFiles(items, install_path); - std::cout << "\t" << orphans.size() << " orphaned files" << std::endl; - for (unsigned int i = 0; i < orphans.size(); ++i) - std::cout << "\t" << orphans[i] << std::endl; + vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED); + delete galaxy; + curl_easy_cleanup(dlhandle); + + return; } void Downloader::galaxyShowBuilds(const std::string& product_id, int build_index)