Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Register write filesink for abfs connector #11973

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,44 @@

namespace facebook::velox::filesystems {

std::function<std::unique_ptr<AzureDataLakeFileClient>()>
AbfsConfig::testWriteClientFn_;

class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
public:
DataLakeFileClientWrapper(std::unique_ptr<DataLakeFileClient> client)
: client_(std::move(client)) {}

void create() override {
client_->Create();
}

Azure::Storage::Files::DataLake::Models::PathProperties getProperties()
override {
return client_->GetProperties().Value;
}

void append(const uint8_t* buffer, size_t size, uint64_t offset) override {
auto bodyStream = Azure::Core::IO::MemoryBodyStream(buffer, size);
client_->Append(bodyStream, offset);
}

void flush(uint64_t position) override {
client_->Flush(position);
}

void close() override {
// do nothing.
}

std::string getUrl() const override {
return client_->GetUrl();
}

private:
const std::unique_ptr<DataLakeFileClient> client_;
};

AbfsConfig::AbfsConfig(
std::string_view path,
const config::ConfigBase& config) {
Expand Down Expand Up @@ -123,19 +161,24 @@ std::unique_ptr<BlobClient> AbfsConfig::getReadFileClient() {
}
}

std::unique_ptr<DataLakeFileClient> AbfsConfig::getWriteFileClient() {
std::unique_ptr<AzureDataLakeFileClient> AbfsConfig::getWriteFileClient() {
if (testWriteClientFn_) {
return testWriteClientFn_();
}
std::unique_ptr<DataLakeFileClient> client;
if (authType_ == kAzureSASAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(
fmt::format("{}?{}", url, sas_));
client =
std::make_unique<DataLakeFileClient>(fmt::format("{}?{}", url, sas_));
} else if (authType_ == kAzureOAuthAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(url, tokenCredential_);
client = std::make_unique<DataLakeFileClient>(url, tokenCredential_);
} else {
return std::make_unique<DataLakeFileClient>(
client = std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
return std::make_unique<DataLakeFileClientWrapper>(std::move(client));
}

std::string AbfsConfig::getUrl(bool withblobSuffix) {
Expand Down
17 changes: 16 additions & 1 deletion velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <azure/storage/files/datalake.hpp>
#include <folly/hash/Hash.h>
#include <string>
#include "velox/connectors/hive/storage_adapters/abfs/AzureDataLakeFileClient.h"

using namespace Azure::Storage::Blobs;
using namespace Azure::Storage::Files::DataLake;
Expand Down Expand Up @@ -66,7 +67,7 @@ class AbfsConfig {

std::unique_ptr<BlobClient> getReadFileClient();

std::unique_ptr<DataLakeFileClient> getWriteFileClient();
std::unique_ptr<AzureDataLakeFileClient> getWriteFileClient();

std::string filePath() const {
return filePath_;
Expand All @@ -92,6 +93,17 @@ class AbfsConfig {
return authorityHost_;
}

/// Test only.
static void setUpTestWriteClient(
std::function<std::unique_ptr<AzureDataLakeFileClient>()> testClientFn) {
testWriteClientFn_ = testClientFn;
}

/// Test only.
static void tearDownTestWriteClient() {
testWriteClientFn_ = nullptr;
}

private:
std::string getUrl(bool withblobSuffix);

Expand All @@ -110,6 +122,9 @@ class AbfsConfig {
std::string tenentId_;
std::string authorityHost_;
std::shared_ptr<Azure::Core::Credentials::TokenCredential> tokenCredential_;

static std::function<std::unique_ptr<AzureDataLakeFileClient>()>
testWriteClientFn_;
};

} // namespace facebook::velox::filesystems
36 changes: 2 additions & 34 deletions velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,6 @@
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

namespace facebook::velox::filesystems {
class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
public:
DataLakeFileClientWrapper(std::unique_ptr<DataLakeFileClient> client)
: client_(std::move(client)) {}

void create() override {
client_->Create();
}

Azure::Storage::Files::DataLake::Models::PathProperties getProperties()
override {
return client_->GetProperties().Value;
}

void append(const uint8_t* buffer, size_t size, uint64_t offset) override {
auto bodyStream = Azure::Core::IO::MemoryBodyStream(buffer, size);
client_->Append(bodyStream, offset);
}

void flush(uint64_t position) override {
client_->Flush(position);
}

void close() override {
// do nothing.
}

private:
const std::unique_ptr<DataLakeFileClient> client_;
};

class AbfsWriteFile::Impl {
public:
Expand Down Expand Up @@ -119,10 +89,8 @@ AbfsWriteFile::AbfsWriteFile(
std::string_view path,
const config::ConfigBase& config) {
auto abfsConfig = AbfsConfig(path, config);
std::unique_ptr<AzureDataLakeFileClient> clientWrapper =
std::make_unique<DataLakeFileClientWrapper>(
abfsConfig.getWriteFileClient());
impl_ = std::make_unique<Impl>(path, clientWrapper);
auto client = abfsConfig.getWriteFileClient();
impl_ = std::make_unique<Impl>(path, client);
}

AbfsWriteFile::AbfsWriteFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <stdint.h>
#include <cstddef>
#include <string>

namespace Azure::Storage::Files::DataLake::Models {
class PathProperties;
Expand All @@ -43,5 +44,6 @@ class AzureDataLakeFileClient {
virtual void append(const uint8_t* buffer, size_t size, uint64_t offset) = 0;
virtual void flush(uint64_t position) = 0;
virtual void close() = 0;
virtual std::string getUrl() const = 0;
};
} // namespace facebook::velox::filesystems
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" // @manual
#include "velox/dwio/common/FileSink.h"
#endif

namespace facebook::velox::filesystems {
Expand All @@ -34,12 +35,28 @@ std::shared_ptr<FileSystem> abfsFileSystemGenerator(
});
return filesystem;
}

std::unique_ptr<velox::dwio::common::FileSink> abfsWriteFileSinkGenerator(
const std::string& fileURI,
const velox::dwio::common::FileSink::Options& options) {
if (isAbfsFile(fileURI)) {
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved
auto fileSystem =
filesystems::getFileSystem(fileURI, options.connectorProperties);
return std::make_unique<dwio::common::WriteFileSink>(
fileSystem->openFileForWrite(fileURI),
fileURI,
options.metricLogger,
options.stats);
}
return nullptr;
}
#endif

void registerAbfsFileSystem() {
#ifdef VELOX_ENABLE_ABFS
LOG(INFO) << "Register ABFS";
registerFileSystem(isAbfsFile, std::function(abfsFileSystemGenerator));
dwio::common::FileSink::registerFactory(
std::function(abfsWriteFileSinkGenerator));
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ TEST(AbfsConfigTest, clientSecretOAuth) {
// GetUrl retrieves the value from the internal blob client, which represents
// the blob's path as well.
EXPECT_EQ(
writeClient->GetUrl(),
writeClient->getUrl(),
"https://efg.blob.core.windows.net/abc/file/test.txt");
}

Expand All @@ -108,7 +108,7 @@ TEST(AbfsConfigTest, sasToken) {
// GetUrl retrieves the value from the internal blob client, which represents
// the blob's path as well.
EXPECT_EQ(
writeClient->GetUrl(),
writeClient->getUrl(),
"http://bar.blob.core.windows.net/abc/file?sas=test");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
#include <random>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"
#include "velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h"
#include "velox/connectors/hive/storage_adapters/abfs/tests/MockDataLakeFileClient.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/exec/tests/utils/PortUtil.h"
#include "velox/exec/tests/utils/TempFilePath.h"

Expand All @@ -44,6 +47,12 @@ class AbfsFileSystemTest : public testing::Test {

static void SetUpTestCase() {
registerAbfsFileSystem();
AbfsConfig::setUpTestWriteClient(
[]() { return std::make_unique<MockDataLakeFileClient>(); });
}

static void TearDownTestSuite() {
AbfsConfig::tearDownTestWriteClient();
}

void SetUp() override {
Expand Down Expand Up @@ -264,3 +273,21 @@ TEST_F(AbfsFileSystemTest, credNotFOund) {
abfs_->openFileForRead(abfsFile),
"Config fs.azure.account.key.test1.dfs.core.windows.net not found");
}

TEST_F(AbfsFileSystemTest, registerAbfsFileSink) {
static const std::vector<std::string> paths = {
"abfs://[email protected]/test",
"abfss://[email protected]/test"};
std::unordered_map<std::string, std::string> config(
{{"fs.azure.account.key.test.dfs.core.windows.net", "NDU2"}});
auto hiveConfig =
std::make_shared<const config::ConfigBase>(std::move(config));
for (const auto& path : paths) {
auto sink = dwio::common::FileSink::create(
path, {.connectorProperties = hiveConfig});
auto writeFileSink = dynamic_cast<dwio::common::WriteFileSink*>(sink.get());
auto writeFile =
dynamic_cast<AbfsWriteFile*>(writeFileSink->toWriteFile().get());
ASSERT_TRUE(writeFile != nullptr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class MockDataLakeFileClient : public AzureDataLakeFileClient {

void close() override;

std::string getUrl() const override {
return "testUrl";
}

// for testing purpose to verify the written content if correct.
std::string readContent() {
std::ifstream inputFile(filePath_);
Expand Down
Loading