From 442fad04db070b2786f78cbb1e115b7ff8a494b6 Mon Sep 17 00:00:00 2001 From: Sude Date: Fri, 20 May 2016 20:33:07 +0300 Subject: [PATCH] Add support for parallel downloads New option "--threads" can be used to set how many parallel downloads to run. The default is 4 threads. Changes to behavior: - Serials and changelogs are saved and covers downloaded for all games that match the "--game" filter before any other files are downloaded. - Automatic XML creation is run after all files are downloaded. Previously xml data was automatically created right after download finished. - The "--limit-rate" option sets rate limit for thread not global rate limit. --- CMakeLists.txt | 24 ++ cmake/FindLibcrypto.cmake | 27 ++ include/config.h | 1 + include/downloader.h | 13 + include/downloadinfo.h | 94 +++++ include/ssl_thread_setup.h | 60 +++ include/threadsafequeue.h | 84 ++++ include/util.h | 1 + main.cpp | 7 + src/downloader.cpp | 798 ++++++++++++++++++++++++------------- src/util.cpp | 14 + 11 files changed, 855 insertions(+), 268 deletions(-) create mode 100644 cmake/FindLibcrypto.cmake create mode 100644 include/downloadinfo.h create mode 100644 include/ssl_thread_setup.h create mode 100644 include/threadsafequeue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 2f05a21..0b7f0aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,10 @@ project (lgogdownloader LANGUAGES CXX VERSION 2.28) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DDEBUG=1") +set(LINK_LIBCRYPTO 0) +find_program(READELF readelf DOC "Location of the readelf program") +find_program(GREP grep DOC "Location of the grep program") find_package(Boost REQUIRED system @@ -13,11 +16,25 @@ find_package(Boost date_time ) find_package(CURL 7.32.0 REQUIRED) +if(CURL_FOUND) + execute_process( + COMMAND ${READELF} -d ${CURL_LIBRARIES} + COMMAND ${GREP} -q "libssl\\|libcrypto" + RESULT_VARIABLE READELF_RESULT_VAR + ) + if(READELF_RESULT_VAR EQUAL 0) + add_definitions(-DSSL_THREAD_SETUP_OPENSSL=1) + find_package(Libcrypto REQUIRED) + set(LINK_LIBCRYPTO 1) + endif(READELF_RESULT_VAR EQUAL 0) +endif(CURL_FOUND) + find_package(OAuth REQUIRED) find_package(Jsoncpp REQUIRED) find_package(Htmlcxx REQUIRED) find_package(Tinyxml2 REQUIRED) find_package(Rhash REQUIRED) +find_package(Threads REQUIRED) file(GLOB SRC_FILES main.cpp @@ -95,8 +112,15 @@ target_link_libraries(${PROJECT_NAME} PRIVATE ${Htmlcxx_LIBRARIES} PRIVATE ${Tinyxml2_LIBRARIES} PRIVATE ${Rhash_LIBRARIES} + PRIVATE ${CMAKE_THREAD_LIBS_INIT} ) +if(LINK_LIBCRYPTO EQUAL 1) + target_link_libraries(${PROJECT_NAME} + PRIVATE ${Libcrypto_LIBRARIES} + ) +endif(LINK_LIBCRYPTO EQUAL 1) + if(MSVC) # Force to always compile with W4 if(CMAKE_CXX_FLAGS MATCHES "/W[0-4]") diff --git a/cmake/FindLibcrypto.cmake b/cmake/FindLibcrypto.cmake new file mode 100644 index 0000000..3aa00b3 --- /dev/null +++ b/cmake/FindLibcrypto.cmake @@ -0,0 +1,27 @@ +# - Try to find libcrypto +# +# Once done this will define +# Libcrypto_FOUND - System has libcrypto +# Libcrypto_INCLUDE_DIRS - The libcrypto include directories +# Libcrypto_LIBRARIES - The libraries needed to use libcrypto + +find_package(PkgConfig) +pkg_check_modules(PC_LIBCRYPTO REQUIRED libcrypto) + +find_path(LIBCRYPTO_INCLUDE_DIR openssl/crypto.h + HINTS ${PC_LIBCRYPTO_INCLUDEDIR} + ${PC_LIBCRYPTO_INCLUDE_DIRS} + ) + +find_library(LIBCRYPTO_LIBRARY NAMES crypto + HINTS ${PC_LIBCRYPTO_LIBDIR} + ${PC_LIBCRYPTO_LIBRARY_DIRS} + ) + +mark_as_advanced(LIBCRYPTO_INCLUDE_DIR LIBCRYPTO_LIBRARY) + +if(PC_LIBCRYPTO_FOUND) + set(Libcrypto_FOUND ON) + set(Libcrypto_INCLUDE_DIRS ${LIBCRYPTO_INCLUDE_DIR}) + set(Libcrypto_LIBRARIES ${LIBCRYPTO_LIBRARY}) +endif(PC_LIBCRYPTO_FOUND) diff --git a/include/config.h b/include/config.h index 75cc478..4353f07 100644 --- a/include/config.h +++ b/include/config.h @@ -82,6 +82,7 @@ class Config unsigned int iInstallerPlatform; unsigned int iInstallerLanguage; unsigned int iInclude; + unsigned int iThreads; int iRetries; int iWait; int iCacheValid; diff --git a/include/downloader.h b/include/downloader.h index a85a1f8..c9b5ee2 100644 --- a/include/downloader.h +++ b/include/downloader.h @@ -25,6 +25,7 @@ #include "api.h" #include "progressbar.h" #include "website.h" +#include "threadsafequeue.h" #include #include #include @@ -48,6 +49,15 @@ class Timer struct timeval last_update; }; +struct xferInfo +{ + unsigned int tid; + CURL* curlhandle; + Timer timer; + std::deque< std::pair > TimeAndSize; + curl_off_t offset; +}; + class Downloader { public: @@ -89,6 +99,9 @@ class Downloader void saveSerials(const std::string& serials, const std::string& filepath); std::string getChangelogFromJSON(const Json::Value& json); void saveChangelog(const std::string& changelog, const std::string& filepath); + static void processDownloadQueue(Config conf, const unsigned int& tid); + static int progressCallbackForThread(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow); + void printProgress(); static int progressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow); static size_t writeMemoryCallback(char *ptr, size_t size, size_t nmemb, void *userp); diff --git a/include/downloadinfo.h b/include/downloadinfo.h new file mode 100644 index 0000000..7acbfac --- /dev/null +++ b/include/downloadinfo.h @@ -0,0 +1,94 @@ +/* This program is free software. It comes without any warranty, to + * the extent permitted by applicable law. You can redistribute it + * and/or modify it under the terms of the Do What The Fuck You Want + * To Public License, Version 2, as published by Sam Hocevar. See + * http://www.wtfpl.net/ for more details. */ + +#ifndef DOWNLOADINFO_H +#define DOWNLOADINFO_H + +#include +#include + +const unsigned int DLSTATUS_NOTSTARTED = 0; +const unsigned int DLSTATUS_STARTING = 1 << 0; +const unsigned int DLSTATUS_RUNNING = 1 << 1; +const unsigned int DLSTATUS_FINISHED = 1 << 2; + +struct progressInfo +{ + curl_off_t dlnow; + curl_off_t dltotal; + double rate; +}; + +class DownloadInfo +{ + public: + void setFilename(const std::string& filename_) + { + std::unique_lock lock(m); + filename = filename_; + } + + std::string getFilename() + { + std::unique_lock lock(m); + return filename; + } + + void setStatus(const unsigned int& status_) + { + std::unique_lock lock(m); + status = status_; + } + + unsigned int getStatus() + { + std::unique_lock lock(m); + return status; + } + + void setProgressInfo(const progressInfo& info) + { + std::unique_lock lock(m); + progress_info = info; + } + + progressInfo getProgressInfo() + { + std::unique_lock lock(m); + return progress_info; + } + + DownloadInfo()=default; + + DownloadInfo(const DownloadInfo& other) + { + std::lock_guard guard(other.m); + filename = other.filename; + status = other.status; + progress_info = other.progress_info; + } + + DownloadInfo& operator= (DownloadInfo& other) + { + if(&other == this) + return *this; + + std::unique_lock lock1(m, std::defer_lock); + std::unique_lock lock2(other.m, std::defer_lock); + std::lock(lock1, lock2); + filename = other.filename; + status = other.status; + progress_info = other.progress_info; + return *this; + } + private: + std::string filename; + unsigned int status; + progressInfo progress_info; + mutable std::mutex m; +}; + +#endif // DOWNLOADINFO_H diff --git a/include/ssl_thread_setup.h b/include/ssl_thread_setup.h new file mode 100644 index 0000000..6a5aee4 --- /dev/null +++ b/include/ssl_thread_setup.h @@ -0,0 +1,60 @@ +/* This program is free software. It comes without any warranty, to + * the extent permitted by applicable law. You can redistribute it + * and/or modify it under the terms of the Do What The Fuck You Want + * To Public License, Version 2, as published by Sam Hocevar. See + * http://www.wtfpl.net/ for more details. */ + +#ifndef SSL_THREAD_SETUP_H +#define SSL_THREAD_SETUP_H + +#include +#include + +#if SSL_THREAD_SETUP_OPENSSL == 1 + #include + + static std::mutex* ssl_mutex_array; + + void thread_locking_callback(int mode, int n, const char* file, int line) + { + if(mode & CRYPTO_LOCK) + ssl_mutex_array[n].lock(); + else + ssl_mutex_array[n].unlock(); + } + + unsigned long thread_id_callback() + { + return (unsigned long)std::hash() (std::this_thread::get_id()); + } + + int ssl_thread_setup() + { + ssl_mutex_array = new std::mutex[CRYPTO_num_locks()]; + if(!ssl_mutex_array) + return 0; + else + { + CRYPTO_set_id_callback(thread_id_callback); + CRYPTO_set_locking_callback(thread_locking_callback); + } + return 1; + } + + int ssl_thread_cleanup() + { + if(!ssl_mutex_array) + return 0; + + CRYPTO_set_id_callback(NULL); + CRYPTO_set_locking_callback(NULL); + delete[] ssl_mutex_array; + ssl_mutex_array = NULL; + return 1; + } +#else + #define ssl_thread_setup() + #define ssl_thread_cleanup() +#endif + +#endif // SSL_THREAD_SETUP_H diff --git a/include/threadsafequeue.h b/include/threadsafequeue.h new file mode 100644 index 0000000..a456daa --- /dev/null +++ b/include/threadsafequeue.h @@ -0,0 +1,84 @@ +/* This program is free software. It comes without any warranty, to + * the extent permitted by applicable law. You can redistribute it + * and/or modify it under the terms of the Do What The Fuck You Want + * To Public License, Version 2, as published by Sam Hocevar. See + * http://www.wtfpl.net/ for more details. */ + +#ifndef THREADSAFEQUEUE_H +#define THREADSAFEQUEUE_H + +#include +#include +#include + +template +class ThreadSafeQueue +{ + public: + void push(const T& item) + { + std::unique_lock lock(m); + q.push(item); + lock.unlock(); + cvar.notify_one(); + } + + bool empty() const + { + std::unique_lock lock(m); + return q.empty(); + } + + typename std::queue::size_type size() const + { + std::unique_lock lock(m); + return q.size(); + } + + bool try_pop(T& item) + { + std::unique_lock lock(m); + if(q.empty()) + return false; + + item = q.front(); + q.pop(); + return true; + } + + void wait_and_pop(T& item) + { + std::unique_lock lock(m); + while(q.empty()) + cvar.wait(lock); + + item = q.front(); + q.pop(); + } + + ThreadSafeQueue() = default; + + ThreadSafeQueue(const ThreadSafeQueue& other) + { + std::lock_guard guard(other.m); + q = other.q; + } + + ThreadSafeQueue& operator= (ThreadSafeQueue& other) + { + if(&other == this) + return *this; + + std::unique_lock lock1(m, std::defer_lock); + std::unique_lock lock2(other.m, std::defer_lock); + std::lock(lock1, lock2); + q = other.q; + return *this; + } + private: + std::queue q; + mutable std::mutex m; + std::condition_variable cvar; +}; + +#endif // THREADSAFEQUEUE_H diff --git a/include/util.h b/include/util.h index 0317b1c..f8e4176 100644 --- a/include/util.h +++ b/include/util.h @@ -89,6 +89,7 @@ namespace Util std::string getOptionNameString(const unsigned int& value, const std::vector& options); void parseOptionString(const std::string &option_string, std::vector &priority, unsigned int &type, const std::vector& options); std::string getLocalFileHash(const std::string& xml_dir, const std::string& filepath, const std::string& gamename = std::string()); + void shortenStringToTerminalWidth(std::string& str); } #endif // UTIL_H diff --git a/main.cpp b/main.cpp index 3307dcc..2aa835c 100644 --- a/main.cpp +++ b/main.cpp @@ -177,6 +177,7 @@ int main(int argc, char *argv[]) ("exclude", bpo::value(&sExcludeOptions)->default_value("covers"), ("Select what not to download/list/repair\n" + include_options_text).c_str()) ("automatic-xml-creation", bpo::value(&config.bAutomaticXMLCreation)->zero_tokens()->default_value(false), "Automatically create XML data after download has completed") ("save-changelogs", bpo::value(&config.bSaveChangelogs)->zero_tokens()->default_value(false), "Save changelogs when downloading") + ("threads", bpo::value(&config.iThreads)->default_value(4), "Number of download threads") ; // Options read from config file options_cfg_only.add_options() @@ -311,6 +312,12 @@ int main(int argc, char *argv[]) if (config.iWait > 0) config.iWait *= 1000; + if (config.iThreads < 1) + { + config.iThreads = 1; + set_vm_value(vm, "threads", config.iThreads); + } + config.bVerifyPeer = !bInsecure; config.bColor = !bNoColor; config.bUnicode = !bNoUnicode; diff --git a/src/downloader.cpp b/src/downloader.cpp index 3df51a1..45f53a1 100644 --- a/src/downloader.cpp +++ b/src/downloader.cpp @@ -7,6 +7,8 @@ #include "downloader.h" #include "util.h" #include "globalconstants.h" +#include "ssl_thread_setup.h" +#include "downloadinfo.h" #include #include @@ -26,8 +28,14 @@ namespace bptime = boost::posix_time; +std::vector vDownloadInfo; +ThreadSafeQueue dlQueue; +ThreadSafeQueue msgQueue; +ThreadSafeQueue createXMLQueue; + Downloader::Downloader(Config &conf) { + ssl_thread_setup(); this->config = conf; if (config.bLoginHTTP && boost::filesystem::exists(config.sCookiePath)) if (!boost::filesystem::remove(config.sCookiePath)) @@ -44,6 +52,7 @@ Downloader::~Downloader() delete gogWebsite; curl_easy_cleanup(curlhandle); curl_global_cleanup(); + ssl_thread_cleanup(); // Make sure that cookie file is only readable/writable by owner Util::setFilePermissions(config.sCookiePath, boost::filesystem::owner_read | boost::filesystem::owner_write); } @@ -931,192 +940,41 @@ void Downloader::download() if (!games[i].installers.empty()) { // Take path from installer path because for some games the base directory for installer/extra path is not "gamename" - std::string filepath = games[i].installers[0].getFilepath(); + boost::filesystem::path filepath = boost::filesystem::absolute(games[i].installers[0].getFilepath(), boost::filesystem::current_path()); // Get base directory from filepath - boost::match_results what; - boost::regex expression("(.*)/.*"); - boost::regex_match(filepath, what, expression); - std::string directory = what[1]; + std::string directory = filepath.parent_path().string(); this->downloadCovers(games[i].gamename, directory, coverXML); } } - // Download installers + if (config.bInstallers) { for (unsigned int j = 0; j < games[i].installers.size(); ++j) { - // Not updated, skip to next installer - if (config.bUpdateCheck && !games[i].installers[j].updated) - continue; - - std::string filepath = games[i].installers[j].getFilepath(); - if (config.blacklist.isBlacklisted(filepath)) - { - if (config.bVerbose) - std::cerr << "skipped blacklisted file " << filepath << std::endl; - continue; - } - - // Get link - std::string url = gogAPI->getInstallerLink(games[i].gamename, games[i].installers[j].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - continue; - } - - // Download - if (!url.empty()) - { - std::string XML; - if (config.bRemoteXML) - { - XML = gogAPI->getXML(games[i].gamename, games[i].installers[j].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - } - } - if (!games[i].installers[j].name.empty()) - std::cout << "Downloading: " << games[i].installers[j].name << std::endl; - std::cout << filepath << std::endl; - this->downloadFile(url, filepath, XML, games[i].gamename); - std::cout << std::endl; - } + dlQueue.push(games[i].installers[j]); } } - // Download extras - if (config.bExtras && !config.bUpdateCheck) - { // Save some time and don't process extras when running update check. Extras don't have updated flag, all of them would be skipped anyway. - for (unsigned int j = 0; j < games[i].extras.size(); ++j) - { - // Get link - std::string url = gogAPI->getExtraLink(games[i].gamename, games[i].extras[j].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - continue; - } - - std::string filepath = games[i].extras[j].getFilepath(); - if (config.blacklist.isBlacklisted(filepath)) - { - if (config.bVerbose) - std::cerr << "skipped blacklisted file " << filepath << std::endl; - continue; - } - - // Download - if (!url.empty()) - { - if (!games[i].extras[j].name.empty()) - std::cout << "Downloading: " << games[i].extras[j].name << std::endl; - std::cout << filepath << std::endl; - CURLcode result = this->downloadFile(url, filepath); - std::cout << std::endl; - if (result==CURLE_OK && config.bAutomaticXMLCreation) - { - std::cout << "Starting automatic XML creation" << std::endl; - std::string xml_dir = config.sXMLDirectory + "/" + games[i].gamename; - Util::createXML(filepath, config.iChunkSize, xml_dir); - std::cout << std::endl; - } - } - } - } - // Download patches if (config.bPatches) { for (unsigned int j = 0; j < games[i].patches.size(); ++j) { - // Not updated, skip to next patch - if (config.bUpdateCheck && !games[i].patches[j].updated) - continue; - - // Get link - std::string url = gogAPI->getPatchLink(games[i].gamename, games[i].patches[j].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - continue; - } - - std::string filepath = games[i].patches[j].getFilepath(); - if (config.blacklist.isBlacklisted(filepath)) - { - if (config.bVerbose) - std::cerr << "skipped blacklisted file " << filepath << std::endl; - continue; - } - - // Download - if (!url.empty()) - { - std::string XML; - if (config.bRemoteXML) - { - XML = gogAPI->getXML(games[i].gamename, games[i].patches[j].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - } - } - if (!games[i].patches[j].name.empty()) - std::cout << "Downloading: " << games[i].patches[j].name << std::endl; - std::cout << filepath << std::endl; - this->downloadFile(url, filepath, XML, games[i].gamename); - std::cout << std::endl; - } + dlQueue.push(games[i].patches[j]); + } + } + if (config.bExtras) + { + for (unsigned int j = 0; j < games[i].extras.size(); ++j) + { + dlQueue.push(games[i].extras[j]); } } - // Download language packs if (config.bLanguagePacks) { for (unsigned int j = 0; j < games[i].languagepacks.size(); ++j) { - // Get link - std::string url = gogAPI->getLanguagePackLink(games[i].gamename, games[i].languagepacks[j].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - continue; - } - - std::string filepath = games[i].languagepacks[j].getFilepath(); - if (config.blacklist.isBlacklisted(filepath)) - { - if (config.bVerbose) - std::cerr << "skipped blacklisted file " << filepath << std::endl; - continue; - } - - // Download - if (!url.empty()) - { - std::string XML; - if (config.bRemoteXML) - { - XML = gogAPI->getXML(games[i].gamename, games[i].languagepacks[j].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - } - } - if (!games[i].languagepacks[j].name.empty()) - std::cout << "Downloading: " << games[i].gamename << " " << games[i].languagepacks[j].name << std::endl; - std::cout << filepath << std::endl; - this->downloadFile(url, filepath, XML, games[i].gamename); - std::cout << std::endl; - } + dlQueue.push(games[i].languagepacks[j]); } } if (config.bDLC && !games[i].dlcs.empty()) @@ -1138,127 +996,57 @@ void Downloader::download() { for (unsigned int k = 0; k < games[i].dlcs[j].installers.size(); ++k) { - std::string filepath = games[i].dlcs[j].installers[k].getFilepath(); - if (config.blacklist.isBlacklisted(filepath)) - { - if (config.bVerbose) - std::cerr << "skipped blacklisted file " << filepath << std::endl; - continue; - } - - // Get link - std::string url = gogAPI->getInstallerLink(games[i].dlcs[j].gamename, games[i].dlcs[j].installers[k].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - continue; - } - - // Download - if (!url.empty()) - { - std::string XML; - if (config.bRemoteXML) - { - XML = gogAPI->getXML(games[i].dlcs[j].gamename, games[i].dlcs[j].installers[k].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - } - } - if (!games[i].dlcs[j].installers[k].name.empty()) - std::cout << "Downloading: " << games[i].dlcs[j].installers[k].name << std::endl; - std::cout << filepath << std::endl; - this->downloadFile(url, filepath, XML, games[i].dlcs[j].gamename); - std::cout << std::endl; - } + dlQueue.push(games[i].dlcs[j].installers[k]); } } if (config.bPatches) { for (unsigned int k = 0; k < games[i].dlcs[j].patches.size(); ++k) { - std::string filepath = games[i].dlcs[j].patches[k].getFilepath(); - if (config.blacklist.isBlacklisted(filepath)) - { - if (config.bVerbose) - std::cerr << "skipped blacklisted file " << filepath << std::endl; - continue; - } - - // Get link - std::string url = gogAPI->getPatchLink(games[i].dlcs[j].gamename, games[i].dlcs[j].patches[k].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - continue; - } - - // Download - if (!url.empty()) - { - std::string XML; - if (config.bRemoteXML) - { - XML = gogAPI->getXML(games[i].dlcs[j].gamename, games[i].dlcs[j].patches[k].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - } - } - if (!games[i].dlcs[j].patches[k].name.empty()) - std::cout << "Downloading: " << games[i].dlcs[j].patches[k].name << std::endl; - std::cout << filepath << std::endl; - this->downloadFile(url, filepath, XML, games[i].dlcs[j].gamename); - std::cout << std::endl; - } + dlQueue.push(games[i].dlcs[j].patches[k]); } } if (config.bExtras) { for (unsigned int k = 0; k < games[i].dlcs[j].extras.size(); ++k) { - std::string filepath = games[i].dlcs[j].extras[k].getFilepath(); - if (config.blacklist.isBlacklisted(filepath)) - { - if (config.bVerbose) - std::cerr << "skipped blacklisted file " << filepath << std::endl; - continue; - } - - // Get link - std::string url = gogAPI->getExtraLink(games[i].dlcs[j].gamename, games[i].dlcs[j].extras[k].id); - if (gogAPI->getError()) - { - std::cerr << gogAPI->getErrorMessage() << std::endl; - gogAPI->clearError(); - continue; - } - - // Download - if (!url.empty()) - { - if (!games[i].dlcs[j].extras[k].name.empty()) - std::cout << "Dowloading: " << games[i].dlcs[j].extras[k].name << std::endl; - CURLcode result = this->downloadFile(url, filepath); - std::cout << std::endl; - if (result==CURLE_OK && config.bAutomaticXMLCreation) - { - std::cout << "Starting automatic XML creation" << std::endl; - std::string xml_dir = config.sXMLDirectory + "/" + games[i].dlcs[j].gamename; - Util::createXML(filepath, config.iChunkSize, xml_dir); - std::cout << std::endl; - } - } + dlQueue.push(games[i].dlcs[j].extras[k]); } } } } } + + // Create download threads + std::vector vThreads; + for (unsigned int i = 0; i < config.iThreads; ++i) + { + DownloadInfo dlInfo; + dlInfo.setStatus(DLSTATUS_NOTSTARTED); + vDownloadInfo.push_back(dlInfo); + vThreads.push_back(std::thread(Downloader::processDownloadQueue, this->config, i)); + } + + this->printProgress(); + + // Join threads + for (unsigned int i = 0; i < vThreads.size(); ++i) + vThreads[i].join(); + + vThreads.clear(); + vDownloadInfo.clear(); + + // Create xml data for all files in the queue + if (!createXMLQueue.empty()) + { + std::cout << "Starting XML creation" << std::endl; + gameFile gf; + while (createXMLQueue.try_pop(gf)) + { + std::string xml_directory = config.sXMLDirectory + "/" + gf.gamename; + Util::createXML(gf.getFilepath(), config.iChunkSize, xml_directory); + } + } } // Download a file, resume if possible @@ -2989,3 +2777,477 @@ void Downloader::showWishlist() return; } + +void Downloader::processDownloadQueue(Config conf, const unsigned int& tid) +{ + std::string msg_prefix = "[Thread #" + std::to_string(tid) + "] "; + + API* api = new API(conf.sToken, conf.sSecret); + if (!api->init()) + { + delete api; + msgQueue.push(msg_prefix + "API init failed"); + vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED); + return; + } + + CURL* dlhandle = curl_easy_init(); + curl_easy_setopt(dlhandle, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(dlhandle, CURLOPT_USERAGENT, conf.sVersionString.c_str()); + curl_easy_setopt(dlhandle, CURLOPT_NOPROGRESS, 0); + + curl_easy_setopt(dlhandle, CURLOPT_CONNECTTIMEOUT, conf.iTimeout); + curl_easy_setopt(dlhandle, CURLOPT_FAILONERROR, true); + curl_easy_setopt(dlhandle, CURLOPT_SSL_VERIFYPEER, conf.bVerifyPeer); + curl_easy_setopt(dlhandle, CURLOPT_VERBOSE, conf.bVerbose); + curl_easy_setopt(dlhandle, CURLOPT_WRITEFUNCTION, Downloader::writeData); + curl_easy_setopt(dlhandle, CURLOPT_READFUNCTION, Downloader::readData); + curl_easy_setopt(dlhandle, CURLOPT_MAX_RECV_SPEED_LARGE, conf.iDownloadRate); + + // Assume that we have connection error and abort transfer with CURLE_OPERATION_TIMEDOUT if download speed is less than 200 B/s for 30 seconds + curl_easy_setopt(dlhandle, CURLOPT_LOW_SPEED_TIME, 30); + curl_easy_setopt(dlhandle, CURLOPT_LOW_SPEED_LIMIT, 200); + + xferInfo xferinfo; + xferinfo.tid = tid; + xferinfo.curlhandle = dlhandle; + + curl_easy_setopt(dlhandle, CURLOPT_XFERINFOFUNCTION, Downloader::progressCallbackForThread); + curl_easy_setopt(dlhandle, CURLOPT_XFERINFODATA, &xferinfo); + + gameFile gf; + while (dlQueue.try_pop(gf)) + { + CURLcode result = CURLE_RECV_ERROR; // assume network error + int iRetryCount = 0; + off_t iResumePosition = 0; + + vDownloadInfo[tid].setStatus(DLSTATUS_STARTING); + + // Get directory from filepath + boost::filesystem::path filepath = gf.getFilepath(); + filepath = boost::filesystem::absolute(filepath, boost::filesystem::current_path()); + boost::filesystem::path directory = filepath.parent_path(); + + // Skip blacklisted files + if (conf.blacklist.isBlacklisted(filepath.string())) + { + if (conf.bVerbose) + msgQueue.push(msg_prefix + "Skipped blacklisted file: " + filepath.string()); + continue; + } + + std::string filenameXML = filepath.filename().string() + ".xml"; + std::string xml_directory = conf.sXMLDirectory + "/" + gf.gamename; + boost::filesystem::path local_xml_file = xml_directory + "/" + filenameXML; + + vDownloadInfo[tid].setFilename(filepath.filename().string()); + msgQueue.push(msg_prefix + "Starting download: " + filepath.filename().string()); + + // Check that directory exists and create subdirectories + if (boost::filesystem::exists(directory)) + { + if (!boost::filesystem::is_directory(directory)) + { + msgQueue.push(msg_prefix + directory.string() + " is not directory"); + continue; + } + } + else + { + if (!boost::filesystem::create_directories(directory)) + { + msgQueue.push(msg_prefix + "Failed to create directory: " + directory.string()); + continue; + } + } + + bool bSameVersion = true; // assume same version + bool bLocalXMLExists = boost::filesystem::exists(local_xml_file); // This is additional check to see if remote xml should be saved to speed up future version checks + + std::string xml; + if (gf.type & (GFTYPE_INSTALLER | GFTYPE_PATCH) && conf.bRemoteXML) + { + xml = api->getXML(gf.gamename, gf.id); + if (api->getError()) + { + msgQueue.push(msg_prefix + api->getErrorMessage()); + api->clearError(); + } + else + { + if (!xml.empty()) + { + std::string localHash = Util::getLocalFileHash(conf.sXMLDirectory, filepath.string(), gf.gamename); + // Do version check if local hash exists + if (!localHash.empty()) + { + tinyxml2::XMLDocument remote_xml; + remote_xml.Parse(xml.c_str()); + tinyxml2::XMLElement *fileElem = remote_xml.FirstChildElement("file"); + if (fileElem) + { + std::string remoteHash = fileElem->Attribute("md5"); + if (remoteHash != localHash) + bSameVersion = false; + } + } + } + } + } + + bool bResume = false; + if (boost::filesystem::exists(filepath) && boost::filesystem::is_regular_file(filepath)) + { + if (bSameVersion) + { + bResume = true; + } + else + { + msgQueue.push(msg_prefix + "Remote file is different, renaming local file"); + std::string date_old = "." + bptime::to_iso_string(bptime::second_clock::local_time()) + ".old"; + boost::filesystem::path new_name = filepath.string() + date_old; // Rename old file by appending date and ".old" to filename + boost::system::error_code ec; + boost::filesystem::rename(filepath, new_name, ec); // Rename the file + if (ec) + { + msgQueue.push(msg_prefix + "Failed to rename " + filepath.string() + " to " + new_name.string() + "\nSkipping file"); + continue; + } + } + } + + // Save remote XML + if (!xml.empty()) + { + if ((bLocalXMLExists && !bSameVersion) || !bLocalXMLExists) + { + // Check that directory exists and create subdirectories + boost::filesystem::path path = xml_directory; + if (boost::filesystem::exists(path)) + { + if (!boost::filesystem::is_directory(path)) + { + msgQueue.push(msg_prefix + path.string() + " is not directory"); + } + } + else + { + if (!boost::filesystem::create_directories(path)) + { + msgQueue.push(msg_prefix + "Failed to create directory: " + path.string()); + } + } + std::ofstream ofs(local_xml_file.string().c_str()); + if (ofs) + { + ofs << xml; + ofs.close(); + } + else + { + msgQueue.push(msg_prefix + "Can't create " + local_xml_file.string()); + } + } + } + + // Get download url + std::string url; + if (gf.type == GFTYPE_INSTALLER) + url = api->getInstallerLink(gf.gamename, gf.id); + else if (gf.type == GFTYPE_PATCH) + url = api->getPatchLink(gf.gamename, gf.id); + else if (gf.type == GFTYPE_LANGPACK) + url = api->getLanguagePackLink(gf.gamename, gf.id); + else if (gf.type == GFTYPE_EXTRA) + url = api->getExtraLink(gf.gamename, gf.id); + else + url = api->getExtraLink(gf.gamename, gf.id); // assume extra if type didn't match any of the others + + if (api->getError()) + { + msgQueue.push(msg_prefix + api->getErrorMessage()); + api->clearError(); + continue; + } + + curl_easy_setopt(dlhandle, CURLOPT_URL, url.c_str()); + do + { + if (iRetryCount != 0) + msgQueue.push(msg_prefix + "Retry " + std::to_string(iRetryCount) + "/" + std::to_string(conf.iRetries) + ": " + filepath.filename().string()); + + FILE* outfile; + // File exists, resume + if (bResume) + { + iResumePosition = boost::filesystem::file_size(filepath); + if ((outfile=fopen(filepath.string().c_str(), "r+"))!=NULL) + { + fseek(outfile, 0, SEEK_END); + curl_easy_setopt(dlhandle, CURLOPT_RESUME_FROM_LARGE, iResumePosition); + curl_easy_setopt(dlhandle, CURLOPT_WRITEDATA, outfile); + } + else + { + msgQueue.push(msg_prefix + "Failed to open " + filepath.string()); + break; + } + } + else // File doesn't exist, create new file + { + if ((outfile=fopen(filepath.string().c_str(), "w"))!=NULL) + { + curl_easy_setopt(dlhandle, CURLOPT_RESUME_FROM_LARGE, 0); // start downloading from the beginning of file + curl_easy_setopt(dlhandle, CURLOPT_WRITEDATA, outfile); + } + else + { + msgQueue.push(msg_prefix + "Failed to create " + filepath.string()); + break; + } + } + + xferinfo.offset = iResumePosition; + xferinfo.timer.reset(); + xferinfo.TimeAndSize.clear(); + result = curl_easy_perform(dlhandle); + fclose(outfile); + + if (result == CURLE_PARTIAL_FILE || result == CURLE_OPERATION_TIMEDOUT) + { + iRetryCount++; + if (boost::filesystem::exists(filepath) && boost::filesystem::is_regular_file(filepath)) + bResume = true; + } + + } while ((result == CURLE_PARTIAL_FILE || result == CURLE_OPERATION_TIMEDOUT) && (iRetryCount <= conf.iRetries)); + + long int response_code = 0; + if (result == CURLE_HTTP_RETURNED_ERROR) + { + curl_easy_getinfo(dlhandle, CURLINFO_RESPONSE_CODE, &response_code); + } + if (result == CURLE_OK || result == CURLE_RANGE_ERROR || (result == CURLE_HTTP_RETURNED_ERROR && response_code == 416)) + msgQueue.push(msg_prefix + "Finished download: " + filepath.filename().string()); + else + { + msgQueue.push(msg_prefix + "Finished download (" + static_cast(curl_easy_strerror(result)) + "): " + filepath.filename().string()); + } + + // Automatic xml creation + if (conf.bAutomaticXMLCreation) + { + if (result == CURLE_OK) + { + if ((gf.type & GFTYPE_EXTRA) || (conf.bRemoteXML && !bLocalXMLExists && xml.empty())) + createXMLQueue.push(gf); + } + } + } + + curl_easy_cleanup(dlhandle); + delete api; + + vDownloadInfo[tid].setStatus(DLSTATUS_FINISHED); + msgQueue.push(msg_prefix + "Finished all tasks"); + + return; +} + +int Downloader::progressCallbackForThread(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) +{ + // unused so lets prevent warnings and be more pedantic + (void) ulnow; + (void) ultotal; + + xferInfo* xferinfo = static_cast(clientp); + + // Update progress info every 100ms + if (xferinfo->timer.getTimeBetweenUpdates()>=100 || dlnow == dltotal) + { + xferinfo->timer.reset(); + progressInfo info; + info.dlnow = dlnow; + info.dltotal = dltotal; + + // trying to get rate and setting to NaN if it fails + if (CURLE_OK != curl_easy_getinfo(xferinfo->curlhandle, CURLINFO_SPEED_DOWNLOAD, &info.rate)) + info.rate = std::numeric_limits::quiet_NaN(); + + // setting full dlwnow and dltotal + if (xferinfo->offset > 0) + { + info.dlnow += xferinfo->offset; + info.dltotal += xferinfo->offset; + } + + // 10 second average download speed + // Don't use static value of 10 seconds because update interval depends on when and how often progress callback is called + xferinfo->TimeAndSize.push_back(std::make_pair(time(NULL), static_cast(info.dlnow))); + if (xferinfo->TimeAndSize.size() > 100) // 100 * 100ms = 10s + { + xferinfo->TimeAndSize.pop_front(); + time_t time_first = xferinfo->TimeAndSize.front().first; + uintmax_t size_first = xferinfo->TimeAndSize.front().second; + time_t time_last = xferinfo->TimeAndSize.back().first; + uintmax_t size_last = xferinfo->TimeAndSize.back().second; + info.rate = (size_last - size_first) / static_cast((time_last - time_first)); + } + + vDownloadInfo[xferinfo->tid].setProgressInfo(info); + vDownloadInfo[xferinfo->tid].setStatus(DLSTATUS_RUNNING); + } + + return 0; +} + +void Downloader::printProgress() +{ + // Print progress information until all threads have finished their tasks + ProgressBar bar(config.bUnicode, config.bColor); + unsigned int dl_status = DLSTATUS_NOTSTARTED; + while (dl_status != DLSTATUS_FINISHED) + { + dl_status = DLSTATUS_NOTSTARTED; + + // Print progress information once per 100ms + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::cout << "\033[J\r" << std::flush; // Clear screen from the current line down to the bottom of the screen + + // Print messages from message queue first + std::string msg; + while (msgQueue.try_pop(msg)) + { + std::cout << msg << std::endl; + if (config.bReport) + { + this->report_ofs << bptime::to_simple_string(bptime::second_clock::local_time()) << ": " << msg << std::endl; + } + } + + int iTermWidth = Util::getTerminalWidth(); + double total_rate = 0; + + // Create progress info text for all download threads + std::vector vProgressText; + for (unsigned int i = 0; i < vDownloadInfo.size(); ++i) + { + std::string progress_text; + int bar_length = 26; + int min_bar_length = 5; + + unsigned int status = vDownloadInfo[i].getStatus(); + dl_status |= status; + + if (status == DLSTATUS_FINISHED) + { + vProgressText.push_back("#" + std::to_string(i) + ": Finished"); + continue; + } + + std::string filename = vDownloadInfo[i].getFilename(); + progressInfo progress_info = vDownloadInfo[i].getProgressInfo(); + total_rate += progress_info.rate; + + bool starting = ((0 == progress_info.dlnow) && (0 == progress_info.dltotal)); + double fraction = starting ? 0.0 : static_cast(progress_info.dlnow) / static_cast(progress_info.dltotal); + + char progress_percentage_text[200]; + sprintf(progress_percentage_text, "%3.0f%% ", fraction * 100); + int progress_percentage_text_length = strlen(progress_percentage_text); + + bptime::time_duration eta(bptime::seconds((long)((progress_info.dltotal - progress_info.dlnow) / progress_info.rate))); + std::stringstream eta_ss; + if (eta.hours() > 23) + { + eta_ss << eta.hours() / 24 << "d " << + std::setfill('0') << std::setw(2) << eta.hours() % 24 << "h " << + std::setfill('0') << std::setw(2) << eta.minutes() << "m " << + std::setfill('0') << std::setw(2) << eta.seconds() << "s"; + } + else if (eta.hours() > 0) + { + eta_ss << eta.hours() << "h " << + std::setfill('0') << std::setw(2) << eta.minutes() << "m " << + std::setfill('0') << std::setw(2) << eta.seconds() << "s"; + } + else if (eta.minutes() > 0) + { + eta_ss << eta.minutes() << "m " << + std::setfill('0') << std::setw(2) << eta.seconds() << "s"; + } + else + { + eta_ss << eta.seconds() << "s"; + } + + std::string rate_unit; + if (progress_info.rate > 1048576) // 1 MB + { + progress_info.rate /= 1048576; + rate_unit = "MB/s"; + } + else + { + progress_info.rate /= 1024; + rate_unit = "kB/s"; + } + + char progress_status_text[200]; // We're probably never going to go as high as 200 characters but it's better to use too big number here than too small + sprintf(progress_status_text, " %0.2f/%0.2fMB @ %0.2f%s ETA: %s", static_cast(progress_info.dlnow)/1024/1024, static_cast(progress_info.dltotal)/1024/1024, progress_info.rate, rate_unit.c_str(), eta_ss.str().c_str()); + int status_text_length = strlen(progress_status_text) + 1; + + if ((status_text_length + progress_percentage_text_length + bar_length) > iTermWidth) + bar_length -= (status_text_length + progress_percentage_text_length + bar_length) - iTermWidth; + + // Don't draw progressbar if length is less than min_bar_length + std::string progress_bar_text; + if (bar_length >= min_bar_length) + progress_bar_text = bar.createBarString(bar_length, fraction); + + progress_text = std::string(progress_percentage_text) + progress_bar_text + std::string(progress_status_text); + std::string filename_text = "#" + std::to_string(i) + " " + filename; + Util::shortenStringToTerminalWidth(filename_text); + + vProgressText.push_back(filename_text); + vProgressText.push_back(progress_text); + } + + // Total download speed and number of remaining tasks in download queue + if (dl_status != DLSTATUS_FINISHED) + { + std::ostringstream ss; + if (config.iThreads > 1) + { + std::string rate_unit; + if (total_rate > 1048576) // 1 MB + { + total_rate /= 1048576; + rate_unit = "MB/s"; + } + else + { + total_rate /= 1024; + rate_unit = "kB/s"; + } + ss << "Total: " << std::setprecision(2) << std::fixed << total_rate << rate_unit << " | "; + } + ss << "Remaining: " << dlQueue.size(); + vProgressText.push_back(ss.str()); + } + + // Print progress info + for (unsigned int i = 0; i < vProgressText.size(); ++i) + { + std::cout << vProgressText[i] << std::endl; + } + + // Move cursor up by vProgressText.size() rows + if (dl_status != DLSTATUS_FINISHED) + { + std::cout << "\033[" << vProgressText.size() << "A\r" << std::flush; + } + } +} diff --git a/src/util.cpp b/src/util.cpp index fc190b8..f88f26f 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -569,3 +569,17 @@ std::string Util::getLocalFileHash(const std::string& xml_dir, const std::string return localHash; } + +void Util::shortenStringToTerminalWidth(std::string& str) +{ + int iStrLen = static_cast(str.length()); + int iTermWidth = Util::getTerminalWidth(); + if (iStrLen >= iTermWidth) + { + size_t chars_to_remove = (iStrLen - iTermWidth) + 4; + size_t middle = iStrLen / 2; + size_t pos1 = middle - (chars_to_remove / 2); + size_t pos2 = middle + (chars_to_remove / 2); + str.replace(str.begin()+pos1, str.begin()+pos2, "..."); + } +}