#include "./database.h" #include "./logging.h" #include #include #include #include #include using namespace std; using namespace CppUtilities; using namespace CppUtilities::EscapeCodes; using namespace LibPkg; namespace LibRepoMgr { namespace WebClient { DatabaseQuery prepareDatabaseQuery(LogContext &log, const std::vector &dbs, bool withFiles) { DatabaseQuery query; query.queryParamsForDbs.reserve(dbs.size()); for (auto *const db : dbs) { const auto &destinationFilePath = withFiles ? db->filesPath : db->path; if (destinationFilePath.empty()) { log(Phrases::ErrorMessage, "No local path configured database \"", db->name, '@', db->arch, "\" (for ", withFiles ? "files" : "regular db", ')', '\n'); continue; } if (db->mirrors.empty()) { log(Phrases::ErrorMessage, "No mirrors configured for database \"", db->name, '@', db->arch, '\"', '\n'); continue; } const auto destinationDirPath = std::filesystem::path(destinationFilePath).parent_path(); try { if (!std::filesystem::exists(destinationDirPath)) { std::filesystem::create_directories(destinationDirPath); } } catch (const std::filesystem::filesystem_error &e) { log(Phrases::ErrorMessage, "Unable to create directory \"", destinationDirPath, "\" for database \"", db->name, '@', db->arch, "\": ", e.what(), '\n'); query.failedDbs.emplace_back(db->name); continue; } query.queryParamsForDbs.emplace_back( DatabaseQueryParams{ db->name, db->arch, (db->mirrors.front() % '/' % db->name) + (withFiles ? ".files" : ".db"), destinationFilePath }); } return query; } static int matchToInt(const std::sub_match &match) { const auto sv = std::string_view(match.first, static_cast(match.length())); return stringToNumber(sv); } static CppUtilities::DateTime parseLastModified(LogContext &log, const auto &message, std::string_view dbName, std::string_view dbArch) { auto lastModified = DateTime(); const auto lastModifiedHeader = message.find(boost::beast::http::field::last_modified); if (lastModifiedHeader != message.cend()) { // parse "Last-Modified" header which should be something like ", :: GMT" const auto lastModifiedStr = lastModifiedHeader->value(); static const auto lastModifiedPattern = std::regex("..., (\\d\\d) (...) (\\d\\d\\d\\d) (\\d\\d):(\\d\\d):(\\d\\d) GMT"); static const auto months = unordered_map{ { "Jan", 1 }, { "Feb", 2 }, { "Mar", 3 }, { "Apr", 4 }, { "May", 5 }, { "Jun", 6 }, { "Jul", 7 }, { "Aug", 8 }, { "Sep", 9 }, { "Oct", 10 }, { "Nov", 11 }, { "Dec", 12 } }; try { auto match = std::cmatch(); if (!std::regex_search(lastModifiedStr.cbegin(), lastModifiedStr.cend(), match, lastModifiedPattern)) { throw ConversionException("date/time denotation not in expected format"); } const auto month = months.find(match[2]); if (month == months.cend()) { throw ConversionException("month is invalid"); } lastModified = DateTime::fromDateAndTime( matchToInt(match[3]), month->second, matchToInt(match[1]), matchToInt(match[4]), matchToInt(match[5]), matchToInt(match[6])); } catch (const ConversionException &e) { log(Phrases::WarningMessage, "Unable to parse \"Last-Modified\" header for database ", dbName, '@', dbArch, ": ", e.what(), " (last modification time was \"", string_view(lastModifiedStr.data(), lastModifiedStr.size()), "\")\n"); } } return lastModified; } void queryDatabases(LogContext &log, ServiceSetup &setup, std::vector &&dbQueries, std::shared_ptr &dbQuerySession, bool force) { for (auto &query : dbQueries) { log(Phrases::InfoMessage, "Retrieving \"", query.databaseName, "\" from mirror: ", query.url, '\n'); auto headHandler = force ? Session::HeadHandler() : [&log, &setup, dbName = query.databaseName, dbArch = query.databaseArch](Session &session3) mutable { auto lastModified = parseLastModified(log, session3.headResponse.get(), dbName, dbArch); auto configReadLock = setup.config.lockToRead(); auto *const destinationDb = setup.config.findDatabase(dbName, dbArch); if (!destinationDb) { log(Phrases::InfoMessage, "Skip requesting database \"", dbName, '@', dbArch, "\" as it no longer exists\n"); session3.skip = true; return; } const auto lastUpdate = destinationDb->lastUpdate.load(); configReadLock.unlock(); if (lastModified > lastUpdate) { return; } log(Phrases::InfoMessage, "Skip requesting database \"", dbName, '@', dbArch, "\" from mirror; last modification time <= last update (", lastModified.toString(), " <= ", lastUpdate.toString(), ')', '\n'); session3.skip = true; }; auto handler = [&log, &setup, dbName = std::move(query.databaseName), dbArch = std::move(query.databaseArch), dbQuerySession, force]( Session &session2, const WebClient::HttpClientError &error) mutable { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { log(Phrases::ErrorMessage, "Error retrieving database file \"", session2.destinationFilePath, "\" for ", dbName, ": ", error.what(), '\n'); dbQuerySession->addResponse(std::move(dbName)); return; } if (session2.skip) { return; } const auto &response = std::get(session2.response); const auto &message = response.get(); if (message.result() != boost::beast::http::status::ok) { log(Phrases::ErrorMessage, "Error retrieving database file \"", session2.destinationFilePath, "\" for ", dbName, ": mirror returned ", message.result_int(), " response\n"); dbQuerySession->addResponse(std::move(dbName)); return; } // log/skip auto lastModified = parseLastModified(log, message, dbName, dbArch); auto loadingLogged = false; if (lastModified.isNull()) { log(Phrases::InfoMessage, "Loading database \"", dbName, '@', dbArch, "\" from mirror response; assuming last modification time to be now\n"); lastModified = DateTime::gmtNow(); loadingLogged = true; } else if (!force) { auto configReadLock = setup.config.lockToRead(); if (auto *const destinationDb = setup.config.findDatabase(dbName, dbArch)) { if (const auto lastUpdate = destinationDb->lastUpdate.load(); lastModified <= lastUpdate) { configReadLock.unlock(); log(Phrases::InfoMessage, "Skip loading database \"", dbName, '@', dbArch, "\" from mirror response; last modification time <= last update (", lastModified.toString(), " <= ", lastUpdate.toString(), ')', '\n'); return; } } } if (!loadingLogged) { log(Phrases::InfoMessage, "Loading database \"", dbName, '@', dbArch, "\" from mirror response; last modification time: ", lastModified.toString(), '\n'); } try { // load packages auto lock = setup.config.lockToRead(); auto db = setup.config.findDatabase(dbName, dbArch); if (!db) { lock.unlock(); log(Phrases::ErrorMessage, "Retrieved database file for \"", dbName, '@', dbArch, "\" but it no longer exists; discarding\n"); return; } auto updater = LibPkg::PackageUpdater(*db, true); updater.insertFromDatabaseFile(session2.destinationFilePath); updater.commit(); db->lastUpdate = lastModified; const auto newPackageCount = db->packageCount(); lock.unlock(); log(Phrases::InfoMessage, "Inserted ", updater.packageCount(), " packages (handling ", updater.handledIDs().size(), " IDs) into database \"", dbName, '@', dbArch, "\" which now contains ", newPackageCount, " packages\n"); } catch (const std::runtime_error &e) { log(Phrases::ErrorMessage, "Unable to parse retrieved database file for \"", dbName, '@', dbArch, "\": ", e.what(), '\n'); dbQuerySession->addResponse(std::move(dbName)); } }; auto session = runSessionFromUrl(setup.building.ioContext, setup.webServer.sslContext, query.url, std::move(handler), std::move(headHandler), std::move(query.destinationFilePath)); } } std::shared_ptr queryDatabases( LogContext &log, ServiceSetup &setup, std::vector &&urls, bool force, DatabaseQuerySession::HandlerType &&handler) { auto dbQuerySession = DatabaseQuerySession::create(setup.building.ioContext, std::move(handler)); queryDatabases(log, setup, std::move(urls), dbQuerySession, force); return dbQuerySession; } std::shared_ptr queryDatabases(LogContext &log, ServiceSetup &setup, std::shared_lock *configReadLock, const std::vector &dbs, bool force, DatabaseQuerySession::HandlerType &&handler) { auto query = prepareDatabaseQuery(log, dbs, setup.building.loadFilesDbs); configReadLock->unlock(); return queryDatabases(log, setup, std::move(query.queryParamsForDbs), force, std::move(handler)); } PackageCachingSession::PackageCachingSession(PackageCachingDataForSession &data, boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, MultiSession::HandlerType &&handler) : MultiSession(ioContext, std::move(handler)) , m_sslContext(sslContext) , m_data(data) , m_dbIterator(data.begin()) , m_dbEnd(data.end()) , m_packageIterator(m_dbIterator != m_dbEnd ? m_dbIterator->second.begin() : decltype(m_packageIterator)()) , m_packageEnd(m_dbIterator != m_dbEnd ? m_dbIterator->second.end() : decltype(m_packageEnd)()) { } void PackageCachingSession::selectNextPackage() { if (++m_packageIterator != m_packageEnd) { return; } if (++m_dbIterator == m_dbEnd) { return; } m_packageIterator = m_dbIterator->second.begin(); m_packageEnd = m_dbIterator->second.end(); } PackageCachingDataForPackage *PackageCachingSession::getCurrentDataAndSelectNext() { std::lock_guard lock(m_mutex); if (m_packageIterator == m_packageEnd) { return nullptr; } auto *const data = &m_packageIterator->second; selectNextPackage(); return data; } void cachePackages(LogContext &log, std::shared_ptr &&packageCachingSession, std::optional bodyLimit, std::size_t maxParallelDownloads) { for (std::size_t startedDownloads = 0; startedDownloads < maxParallelDownloads && (!packageCachingSession->aborted || !*packageCachingSession->aborted); ++startedDownloads) { auto *const cachingData = packageCachingSession->getCurrentDataAndSelectNext(); if (!cachingData) { return; } log(Phrases::InfoMessage, "Downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\"\n"); runSessionFromUrl( packageCachingSession->ioContext(), packageCachingSession->m_sslContext, cachingData->url, [&log, bodyLimit, packageCachingSession, cachingData](Session &session, const WebClient::HttpClientError &error) mutable { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { const auto msg = std::make_tuple( "Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\": ", error.what()); cachingData->error = tupleToString(msg); log(Phrases::ErrorMessage, msg, '\n'); } const auto &response = std::get(session.response); const auto &message = response.get(); if (message.result() != boost::beast::http::status::ok) { const auto msg = std::make_tuple("Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\": mirror returned ", message.result_int(), " response"); cachingData->error = tupleToString(msg); log(Phrases::ErrorMessage, msg, '\n'); } cachePackages(log, std::move(packageCachingSession), bodyLimit, 1); }, std::string(cachingData->destinationFilePath), std::string_view(), std::string_view(), boost::beast::http::verb::get, bodyLimit); } } } // namespace WebClient } // namespace LibRepoMgr