Skip to content

Commit

Permalink
Merge pull request #472 from wazuh/fix/make-download-async
Browse files Browse the repository at this point in the history
Make files download async
  • Loading branch information
TomasTurina authored Dec 31, 2024
2 parents 17a873a + 841e1dd commit 95b9665
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace centralized_configuration
using SetGroupIdFunctionType = std::function<bool(const std::vector<std::string>& groupList)>;
using GetGroupIdFunctionType = std::function<std::vector<std::string>()>;
using DownloadGroupFilesFunctionType =
std::function<bool(const std::string& group, const std::string& dstFilePath)>;
std::function<boost::asio::awaitable<bool>(std::string group, std::string dstFilePath)>;
using ValidateFileFunctionType = std::function<bool(const std::filesystem::path& configFile)>;
using ReloadModulesFunctionType = std::function<void()>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,19 @@ namespace centralized_configuration
{
const std::filesystem::path tmpGroupFile =
m_fileSystemWrapper->temp_directory_path() / (groupId + config::DEFAULT_SHARED_FILE_EXTENSION);
m_downloadGroupFilesFunction(groupId, tmpGroupFile.string());

// NOLINTBEGIN(cppcoreguidelines-no-suspend-with-lock)
const auto dlResult = co_await m_downloadGroupFilesFunction(groupId, tmpGroupFile.string());
// NOLINTEND(cppcoreguidelines-no-suspend-with-lock)

if (!dlResult)
{
LogWarn("Failed to download the file for group '{}'", groupId);
co_return module_command::CommandExecutionResult {
module_command::Status::FAILURE,
"CentralizedConfiguration failed to download the file for group '" + groupId + "'"};
}

if (!m_validateFileFunction(tmpGroupFile))
{
LogWarn("Failed to validate the file for group '{}', invalid group file received: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ TEST(CentralizedConfiguration, ExecuteCommandReturnsFailureOnParseParameters)
{
CentralizedConfiguration centralizedConfiguration;
centralizedConfiguration.SetGroupIdFunction([](const std::vector<std::string>&) { return true; });
centralizedConfiguration.SetDownloadGroupFilesFunction([](const std::string&, const std::string&)
{ return true; });
centralizedConfiguration.SetDownloadGroupFilesFunction(
[](std::string, std::string) -> boost::asio::awaitable<bool> { co_return true; });
centralizedConfiguration.ValidateFileFunction([](const std::filesystem::path&) { return true; });
centralizedConfiguration.ReloadModulesFunction([]() {});

Expand Down Expand Up @@ -123,8 +123,8 @@ TEST(CentralizedConfiguration, ExecuteCommandHandlesRecognizedCommands)
CentralizedConfiguration centralizedConfiguration(std::move(mockFileSystem));
centralizedConfiguration.SetGroupIdFunction([](const std::vector<std::string>&) { return true; });
centralizedConfiguration.GetGroupIdFunction([]() { return std::vector<std::string> {"group1", "group2"}; });
centralizedConfiguration.SetDownloadGroupFilesFunction([](const std::string&, const std::string&)
{ return true; });
centralizedConfiguration.SetDownloadGroupFilesFunction(
[](std::string, std::string) -> boost::asio::awaitable<bool> { co_return true; });
centralizedConfiguration.ValidateFileFunction([](const std::filesystem::path&) { return true; });
centralizedConfiguration.ReloadModulesFunction([]() {});

Expand Down Expand Up @@ -186,12 +186,14 @@ TEST(CentralizedConfiguration, SetFunctionsAreCalledAndReturnsCorrectResultsForS
return true;
});

// NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines)
centralizedConfiguration.SetDownloadGroupFilesFunction(
[&wasDownloadGroupFilesFunctionCalled](const std::string&, const std::string&)
[&wasDownloadGroupFilesFunctionCalled](std::string, std::string) -> boost::asio::awaitable<bool>
{
wasDownloadGroupFilesFunctionCalled = true;
return wasDownloadGroupFilesFunctionCalled;
co_return wasDownloadGroupFilesFunctionCalled;
});
// NOLINTEND(cppcoreguidelines-avoid-capturing-lambda-coroutines)

centralizedConfiguration.ValidateFileFunction([](const std::filesystem::path&) { return true; });
centralizedConfiguration.ReloadModulesFunction([]() {});
Expand Down Expand Up @@ -244,12 +246,14 @@ TEST(CentralizedConfiguration, SetFunctionsAreCalledAndReturnsCorrectResultsForU
return std::vector<std::string> {"group1", "group2"};
});

// NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines)
centralizedConfiguration.SetDownloadGroupFilesFunction(
[&wasDownloadGroupFilesFunctionCalled](const std::string&, const std::string&)
[&wasDownloadGroupFilesFunctionCalled](std::string, std::string) -> boost::asio::awaitable<bool>
{
wasDownloadGroupFilesFunctionCalled = true;
return wasDownloadGroupFilesFunctionCalled;
co_return wasDownloadGroupFilesFunctionCalled;
});
// NOLINTEND(cppcoreguidelines-avoid-capturing-lambda-coroutines)

centralizedConfiguration.ValidateFileFunction([](const std::filesystem::path&) { return true; });
centralizedConfiguration.ReloadModulesFunction([]() {});
Expand Down
2 changes: 1 addition & 1 deletion src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ namespace communicator
/// @param groupName The name of the group to retrieve the configuration for
/// @param dstFilePath The path to the file to store the configuration in
/// @return true if the configuration was successfully retrieved, false otherwise
bool GetGroupConfigurationFromManager(const std::string& groupName, const std::string& dstFilePath);
boost::asio::awaitable<bool> GetGroupConfigurationFromManager(std::string groupName, std::string dstFilePath);

/// @brief Stops the communication process
void Stop();
Expand Down
7 changes: 0 additions & 7 deletions src/agent/communicator/include/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ namespace http_client
boost::beast::http::response<boost::beast::http::dynamic_body>
PerformHttpRequest(const HttpRequestParams& params) override;

/// @brief Downloads HTTP response to a file
/// @param params Parameters for the request
/// @param dstFilePath Destination file path for the response
/// @return The HTTP response
boost::beast::http::response<boost::beast::http::dynamic_body>
PerformHttpRequestDownload(const HttpRequestParams& params, const std::string& dstFilePath) override;

/// @brief Authenticates using UUID and key
/// @param serverUrl Server URL for authentication
/// @param userAgent User agent header
Expand Down
7 changes: 0 additions & 7 deletions src/agent/communicator/include/ihttp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,5 @@ namespace http_client
const std::string& user,
const std::string& password,
const std::string& verificationMode) = 0;

/// @brief Perform an HTTP request, receive the response and write it to a file
/// @param params The parameters for the request
/// @param dstFilePath The path to the file where the response should be written
/// @return The response
virtual boost::beast::http::response<boost::beast::http::dynamic_body>
PerformHttpRequestDownload(const HttpRequestParams& params, const std::string& dstFilePath) = 0;
};
} // namespace http_client
6 changes: 0 additions & 6 deletions src/agent/communicator/include/ihttp_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ namespace http_client
virtual void Read(boost::beast::http::response<boost::beast::http::dynamic_body>& res,
boost::system::error_code& ec) = 0;

/// @brief Reads a response from the socket and writes it to a file
/// @param res The response to read
/// @param dstFilePath The path to the file to write to
virtual void ReadToFile(boost::beast::http::response<boost::beast::http::dynamic_body>& res,
const std::string& dstFilePath) = 0;

/// @brief Asynchronous version of Read
/// @param res The response to read
/// @param ec The error code, if any occurred
Expand Down
28 changes: 24 additions & 4 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <algorithm>
#include <chrono>
#include <fstream>
#include <sstream>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -221,8 +222,27 @@ namespace communicator
}
}

bool Communicator::GetGroupConfigurationFromManager(const std::string& groupName, const std::string& dstFilePath)
boost::asio::awaitable<bool> Communicator::GetGroupConfigurationFromManager(std::string groupName,
std::string dstFilePath)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};

bool downloaded = false;

auto onSuccess = [path = std::move(dstFilePath), &downloaded](const int, const std::string& res)
{
std::ofstream file(path, std::ios::binary);
if (file)
{
file << res;
file.close();
downloaded = true;
}
};

const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get,
m_serverUrl,
"/api/v1/files?file_name=" + groupName +
Expand All @@ -231,10 +251,10 @@ namespace communicator
m_verificationMode,
*m_token);

const auto result = m_httpClient->PerformHttpRequestDownload(reqParams, dstFilePath);
co_await m_httpClient->Co_PerformHttpRequest(
m_token, reqParams, {}, onAuthenticationFailed, m_retryInterval, m_batchSize, onSuccess, {});

return result.result() >= boost::beast::http::status::ok &&
result.result() < boost::beast::http::status::multiple_choices;
co_return downloaded;
}

void Communicator::Stop()
Expand Down
26 changes: 11 additions & 15 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ namespace http_client

if (ec != boost::system::errc::success)
{
LogWarn("Failed to send http request to endpoint: {}. Retrying in {} seconds.",
reqParams.Endpoint,
connectionRetry / A_SECOND_IN_MILLIS);
LogDebug("Failed to send http request to endpoint: {}. Retrying in {} seconds.",
reqParams.Endpoint,
connectionRetry / A_SECOND_IN_MILLIS);
LogDebug("Http request failed: {} - {}", ec.message(), ec.what());
co_await WaitForTimer(timer, connectionRetry);
continue;
Expand Down Expand Up @@ -186,7 +186,10 @@ namespace http_client

if (ec)
{
LogWarn("Error writing request ({}): {}.", std::to_string(ec.value()), ec.message());
LogDebug("Error writing request ({}): {}. Endpoint: {}.",
std::to_string(ec.value()),
ec.message(),
reqParams.Endpoint);
socket->Close();
co_await WaitForTimer(timer, connectionRetry);
continue;
Expand All @@ -197,7 +200,10 @@ namespace http_client

if (ec)
{
LogWarn("Error reading response ({}): {}.", std::to_string(ec.value()), ec.message());
LogDebug("Error reading response ({}): {}. Endpoint: {}.",
std::to_string(ec.value()),
ec.message(),
reqParams.Endpoint);
socket->Close();
co_await WaitForTimer(timer, connectionRetry);
continue;
Expand Down Expand Up @@ -239,16 +245,6 @@ namespace http_client
boost::system::error_code& ec) { socket->Read(res, ec); });
}

boost::beast::http::response<boost::beast::http::dynamic_body>
HttpClient::PerformHttpRequestDownload(const HttpRequestParams& params, const std::string& dstFilePath)
{
return PerformHttpRequestInternal(
params,
[&dstFilePath](std::unique_ptr<IHttpSocket>& socket,
boost::beast::http::response<boost::beast::http::dynamic_body>& res,
boost::system::error_code&) { socket->ReadToFile(res, dstFilePath); });
}

std::optional<std::string> HttpClient::AuthenticateWithUuidAndKey(const std::string& serverUrl,
const std::string& userAgent,
const std::string& uuid,
Expand Down
73 changes: 0 additions & 73 deletions src/agent/communicator/src/http_client_utils.hpp

This file was deleted.

17 changes: 0 additions & 17 deletions src/agent/communicator/src/http_socket.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "http_client_utils.hpp"
#include <ihttp_socket.hpp>
#include <logger.hpp>

Expand Down Expand Up @@ -118,22 +117,6 @@ namespace http_client
}
}

/// @brief Reads a response from the socket and writes it to a file
/// @param res The response to read
/// @param dstFilePath The path to the file to write to
void ReadToFile(boost::beast::http::response<boost::beast::http::dynamic_body>& res,
const std::string& dstFilePath) override
{
try
{
http_client_utils::ReadToFile(m_socket, res, dstFilePath);
}
catch (const std::exception& e)
{
LogDebug("Exception thrown during read to file: {}", e.what());
}
}

/// @brief Asynchronous version of Read
/// @param res The response to read
/// @param ec The error code, if any occurred
Expand Down
16 changes: 0 additions & 16 deletions src/agent/communicator/src/https_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,6 @@ namespace http_client
}
}

/// @brief Reads a response from the socket and writes it to a file
/// @param res The response to read
/// @param dstFilePath The path to the file to write to
void ReadToFile(boost::beast::http::response<boost::beast::http::dynamic_body>& res,
const std::string& dstFilePath) override
{
try
{
http_client_utils::ReadToFile(m_ssl_socket, res, dstFilePath);
}
catch (const std::exception& e)
{
LogDebug("Exception thrown during read to file: {}", e.what());
}
}

/// @brief Asynchronous version of Read
/// @param res The response to read
/// @param ec The error code, if any occurred
Expand Down
Loading

0 comments on commit 95b9665

Please sign in to comment.