Skip to content

Commit

Permalink
WIP: Azurite testing
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Dec 30, 2024
1 parent 2222c8c commit 2b03352
Show file tree
Hide file tree
Showing 24 changed files with 736 additions and 160 deletions.
752 changes: 643 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,18 @@ steps:
args: [--persistent-user-tables]
skip: "Persistence tests disabled"

- id: azurite-testdrive
label: "testdrive with Azurite blob store"
depends_on: build-aarch64
timeout_in_minutes: 180
agents:
queue: hetzner-aarch64-8cpu-16gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--azurite]


- group: Limits
key: limits-group
steps:
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def execute(self, e: Executor) -> None:
name=self.mz_service,
image=image,
external_metadata_store=True,
external_minio=True,
external_blob_store=True,
environment_extra=self.environment_extra,
system_parameter_defaults=self.system_parameter_defaults,
additional_system_parameter_defaults=self.additional_system_parameter_defaults,
Expand Down
4 changes: 2 additions & 2 deletions misc/python/materialize/data_ingest/transaction_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
Materialized(
name=self.workload.mz_service,
ports=ports,
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand Down Expand Up @@ -129,7 +129,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
Materialized(
name=self.workload.mz_service,
ports=ports,
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand Down
27 changes: 17 additions & 10 deletions misc/python/materialize/mzcompose/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import os

from materialize import MZ_ROOT
from materialize.mzcompose import loader
from materialize.mzcompose.service import (
Service,
ServiceHealthcheck
)
from materialize.mzcompose.service import Service, ServiceHealthcheck


def azure_blob_uri(address: str = "azurite") -> str:
return f"http://devstoreaccount1.{address}:10000/container"


class Azurite(Service):
Expand All @@ -23,18 +21,27 @@ class Azurite(Service):
def __init__(
self,
name: str = "azurite",
aliases: list[str] = ["azurite"],
aliases: list[str] = ["azurite", "devstoreaccount1.azurite"],
image: str | None = None,
command: list[str] | None = None,
in_memory: bool = False,
healthcheck: ServiceHealthcheck | None = None,
stop_grace_period: str = "120s",
):
if image is None:
image = f"mcr.microsoft.com/azure-storage/azurite:{self.DEFAULT_AZURITE_TAG}"
image = (
f"mcr.microsoft.com/azure-storage/azurite:{self.DEFAULT_AZURITE_TAG}"
)

if command is None:
command = ["azurite-blob", "--blobHost", "0.0.0.0:10000", "--blobPort", "10000"]
command = [
"azurite-blob",
"--blobHost",
"0.0.0.0",
"--blobPort",
"10000",
"--disableProductStyleUrl",
]

if in_memory:
command.append("--inMemoryPersistence")
Expand Down
17 changes: 12 additions & 5 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ServiceConfig,
ServiceDependency,
)
from materialize.mzcompose.services.azure import azure_blob_uri
from materialize.mzcompose.services.minio import minio_blob_uri
from materialize.mzcompose.services.postgres import METADATA_STORE

Expand All @@ -49,7 +50,8 @@ def __init__(
environment_id: str | None = None,
propagate_crashes: bool = True,
external_metadata_store: str | bool = False,
external_minio: str | bool = False,
external_blob_store: str | bool = False,
blob_store_is_azure: bool = False,
unsafe_mode: bool = True,
restart: str | None = None,
use_default_volumes: bool = True,
Expand Down Expand Up @@ -149,10 +151,15 @@ def __init__(
environment_id = DEFAULT_MZ_ENVIRONMENT_ID
command += [f"--environment-id={environment_id}"]

if external_minio:
depends_graph["minio"] = {"condition": "service_healthy"}
address = "minio" if external_minio == True else external_minio
persist_blob_url = minio_blob_uri(address)
if external_blob_store:
blob_store = "azurite" if blob_store_is_azure else "minio"
depends_graph[blob_store] = {"condition": "service_healthy"}
address = blob_store if external_blob_store == True else external_blob_store
persist_blob_url = (
azure_blob_uri(address)
if blob_store_is_azure
else minio_blob_uri(address)
)

if persist_blob_url:
command.append(f"--persist-blob-url={persist_blob_url}")
Expand Down
4 changes: 2 additions & 2 deletions misc/python/materialize/mzcompose/services/testdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(
aws_secret_access_key: str | None = "minioadmin",
no_consistency_checks: bool = False,
external_metadata_store: bool = False,
external_minio: bool = False,
external_blob_store: bool = False,
fivetran_destination: bool = False,
fivetran_destination_url: str = "http://fivetran-destination:6874",
fivetran_destination_files_path: str = "/share/tmp",
Expand Down Expand Up @@ -157,7 +157,7 @@ def __init__(
f"--fivetran-destination-files-path={fivetran_destination_files_path}"
)

if external_minio:
if external_blob_store:
depends_graph["minio"] = {"condition": "service_healthy"}
persist_blob_url = "s3://minioadmin:minioadmin@persist/persist?endpoint=http://minio:9000/&region=minio"
entrypoint.append(f"--persist-blob-url={persist_blob_url}")
Expand Down
4 changes: 2 additions & 2 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ def run(self, exe: Executor) -> bool:
with self.composition.override(
Materialized(
restart="on-failure",
external_minio="toxiproxy",
external_blob_store="toxiproxy",
external_metadata_store="toxiproxy",
ports=["6975:6875", "6976:6876", "6977:6877"],
sanity_restart=self.sanity_restart,
Expand Down Expand Up @@ -1655,7 +1655,7 @@ def run(self, exe: Executor) -> bool:
with self.composition.override(
Materialized(
name=mz_service,
external_minio="toxiproxy",
external_blob_store="toxiproxy",
external_metadata_store="toxiproxy",
ports=ports,
sanity_restart=self.sanity_restart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run(self, c: Composition, state: State) -> None:
with c.override(
Materialized(
name=state.mz_service,
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
deploy_generation=state.deploy_generation,
system_parameter_defaults=state.system_parameter_defaults,
Expand Down
6 changes: 3 additions & 3 deletions misc/python/materialize/zippy/mz_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def run(self, c: Composition, state: State) -> None:
with c.override(
Materialized(
name=state.mz_service,
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
deploy_generation=state.deploy_generation,
system_parameter_defaults=state.system_parameter_defaults,
Expand Down Expand Up @@ -158,7 +158,7 @@ def run(self, c: Composition, state: State) -> None:
with c.override(
Materialized(
name=state.mz_service,
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
deploy_generation=state.deploy_generation,
system_parameter_defaults=state.system_parameter_defaults,
Expand Down Expand Up @@ -190,7 +190,7 @@ def run(self, c: Composition, state: State) -> None:
with c.override(
Materialized(
name=state.mz_service,
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
deploy_generation=state.deploy_generation,
system_parameter_defaults=state.system_parameter_defaults,
Expand Down
18 changes: 13 additions & 5 deletions src/persist/src/abs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use azure_core::StatusCode;
use azure_identity::create_default_credential;
use azure_storage::{prelude::*, EMULATOR_ACCOUNT};
use azure_storage::{prelude::*, CloudLocation, EMULATOR_ACCOUNT};
use azure_storage_blobs::prelude::*;
use bytes::Bytes;
use futures_util::StreamExt;
Expand Down Expand Up @@ -63,9 +63,15 @@ impl ABSBlobConfig {
) -> Result<Self, Error> {
let client = if account == EMULATOR_ACCOUNT {
info!("Connecting to Azure emulator");
ClientBuilder::emulator()
.blob_service_client()
.container_client(container)
ClientBuilder::with_location(
CloudLocation::Emulator {
address: url.domain().expect("domain for Azure emulator").to_string(),
port: url.port().expect("port for Azure emulator"),
},
StorageCredentials::emulator(),
)
.blob_service_client()
.container_client(container)
} else {
let sas_credentials = match url.query() {
Some(query) => Some(StorageCredentials::sas_token(query)),
Expand Down Expand Up @@ -167,7 +173,9 @@ impl ABSBlob {
// TODO: we could move this logic into the test harness.
// it's currently here because it's surprisingly annoying to
// create the container out-of-band
let _ = config.client.create().await;
if let Err(e) = config.client.create().await {
warn!("Failed to create container: {e}");
}
}

let ret = ABSBlob {
Expand Down
2 changes: 1 addition & 1 deletion test/backup-restore-postgres/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
Minio(setup_materialize=True),
Mc(),
Materialized(
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
sanity_restart=False,
metadata_store="postgres-metadata",
Expand Down
2 changes: 1 addition & 1 deletion test/backup-restore/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
Minio(setup_materialize=True),
Mc(),
Materialized(
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
sanity_restart=False,
metadata_store="cockroach",
Expand Down
2 changes: 1 addition & 1 deletion test/cluster/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -3218,7 +3218,7 @@ def workflow_test_incident_70(c: Composition) -> None:
with c.override(
Materialized(
external_metadata_store=True,
external_minio=True,
external_blob_store=True,
sanity_restart=False,
),
Minio(setup_materialize=True),
Expand Down
4 changes: 2 additions & 2 deletions test/data-ingest/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
# Fixed port so that we keep the same port after restarting Mz in disruptions
Materialized(
ports=["16875:6875"],
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(zero_downtime=True),
additional_system_parameter_defaults={"enable_table_keys": "true"},
Expand All @@ -65,7 +65,7 @@
Materialized(
name="materialized2",
ports=["26875:6875"],
external_minio=True,
external_blob_store=True,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(zero_downtime=True),
additional_system_parameter_defaults={"enable_table_keys": "true"},
Expand Down
2 changes: 1 addition & 1 deletion test/feature-benchmark/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def create_mz_service(
additional_system_parameter_defaults=additional_system_parameter_defaults,
external_metadata_store=True,
metadata_store="cockroach",
external_minio=True,
external_blob_store=True,
sanity_restart=False,
)

Expand Down
2 changes: 1 addition & 1 deletion test/mzcompose_examples/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def workflow_mz_with_options(c: Composition) -> None:


def workflow_minio(c: Composition) -> None:
mz = Materialized(external_minio=True)
mz = Materialized(external_blob_store=True)

with c.override(mz):
c.up("materialized")
2 changes: 1 addition & 1 deletion test/parallel-benchmark/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def run_once(
default_size=args.size,
soft_assertions=False,
external_metadata_store=True,
external_minio=True,
external_blob_store=True,
sanity_restart=False,
additional_system_parameter_defaults=ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
| {"max_connections": "100000"},
Expand Down
2 changes: 1 addition & 1 deletion test/parallel-workload/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:

with c.override(
Materialized(
external_minio="toxiproxy",
external_blob_store="toxiproxy",
external_metadata_store="toxiproxy",
ports=["6975:6875", "6976:6876", "6977:6877"],
sanity_restart=sanity_restart,
Expand Down
2 changes: 1 addition & 1 deletion test/platform-checks/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def create_mzs(
Materialized(
name=mz_name,
external_metadata_store=True,
external_minio=True,
external_blob_store=True,
sanity_restart=False,
volumes_extra=["secrets:/share/secrets"],
metadata_store="cockroach",
Expand Down
8 changes: 4 additions & 4 deletions test/testdrive-old-kafka-src-syntax/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
Postgres(),
MySql(),
Minio(setup_materialize=True, additional_directories=["copytos3"]),
Materialized(external_minio=True),
Materialized(external_blob_store=True),
FivetranDestination(volumes_extra=["tmp:/share/tmp"]),
Testdrive(external_minio=True),
Testdrive(external_blob_store=True),
]


Expand Down Expand Up @@ -133,7 +133,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:

materialized = Materialized(
default_size=args.default_size,
external_minio=True,
external_blob_store=True,
additional_system_parameter_defaults=additional_system_parameter_defaults,
)

Expand All @@ -144,7 +144,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
validate_catalog_store=True,
default_timeout=args.default_timeout,
volumes_extra=["mzdata:/mzdata"],
external_minio=True,
external_blob_store=True,
fivetran_destination=True,
fivetran_destination_files_path="/share/tmp",
entrypoint_extra=[
Expand Down
Loading

0 comments on commit 2b03352

Please sign in to comment.