diff --git a/.gitignore b/.gitignore index 68bc17f..566ca4a 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# VSCode +.vscode \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 682053b..3b03e4a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,9 @@ RUN apt-get update && apt-get -y upgrade && apt-get -y clean RUN useradd --create-home --shell /bin/sh --uid 8000 opencost COPY --from=builder /app /app COPY src/opencost_parquet_exporter.py /app/opencost_parquet_exporter.py +COPY src/data_types.json /app/data_types.json +COPY src/rename_cols.json /app/rename_cols.json +COPY src/ignore_alloc_keys.json /app/ignore_alloc_keys.json RUN chmod 755 /app/opencost_parquet_exporter.py && chown -R opencost /app/ USER opencost ENV PATH="/app/.venv/bin:$PATH" diff --git a/README.md b/README.md index 26b3421..dd0ed1f 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,26 @@ The script supports the following environment variables: * OPENCOST_PARQUET_FILE_KEY_PREFIX: This is the prefix used for the export, by default it is '/tmp'. The export is going to be saved inside this prefix, in the following structure: year=window_start.year/month=window_start.month/day=window_start.day , ex: tmp/year=2024/month=1/date=15 * OPENCOST_PARQUET_AGGREGATE: This is the dimentions used to aggregate the data. by default we use "namespace,pod,container" which is the same dimensions used for the CSV native export. * OPENCOST_PARQUET_STEP: This is the Step for the export, by default we use 1h steps, which result in 24 steps in a day and make easier to match the exported data to AWS CUR, since cur also export on hourly base. +* OPENCOST_PARQUET_RESOLUTION: Duration to use as resolution in Prometheus queries. Smaller values (i.e. higher resolutions) will provide better accuracy, but worse performance (i.e. slower query time, higher memory use). Larger values (i.e. lower resolutions) will perform better, but at the expense of lower accuracy for short-running workloads. +* OPENCOST_PARQUET_ACCUMULATE: If `"true"`, sum the entire range of time intervals into a single set. Default value is `"false"`. +* OPENCOST_PARQUET_INCLUDE_IDLE: Whether to return the calculated __idle__ field for the query. Default is `"false"`. +* OPENCOST_PARQUET_IDLE_BY_NODE: If `"true"`, idle allocations are created on a per node basis. Which will result in different values when shared and more idle allocations when split. Default is `"false"`. +* OPENCOST_PARQUET_STORAGE_BACKEND: The storage backend to use. Supports `aws`, `azure`. See below for Azure specific variables. +* OPENCOST_PARQUET_JSON_SEPARATOR: The OpenCost API returns nested objects. The used [JSON normalization method](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.json_normalize.html) allows for a custom separator. Use this to specify the separator of your choice. + +## Azure Specific Environment Variables +* OPENCOST_PARQUET_AZURE_STORAGE_ACCOUNT_NAME: Name of the Azure Storage Account you want to export the data to. +* OPENCOST_PARQUET_AZURE_CONTAINER_NAME: The container within the storage account you want to save the data to. The service principal requires write permissions on the container +* OPENCOST_PARQUET_AZURE_TENANT: You Azure Tenant ID +* OPENCOST_PARQUET_AZURE_APPLICATION_ID: ClientID of the Service Principal +* OPENCOST_PARQUET_AZURE_APPLICATION_SECRET: Secret of the Service Principal + +# Prerequisites +## AWS IAM + +## Azure RBAC +The current implementation allows for authentication via [Service Principals](https://learn.microsoft.com/en-us/entra/identity-platform/app-objects-and-service-principals?tabs=browser) on the Azure Storage Account. Therefore, to use the Azure storage backend you need an existing service principal with according role assignments. Azure RBAC has built-in roles for Storage Account Blob Storage operations. The [Storage-Blob-Data-Contributor](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/storage#storage-blob-data-contributor) allows to write data to a Azure Storage Account container. A less permissivie custom role can be built and is encouraged! + # Usage: diff --git a/examples/k8s_cron_job.yaml b/examples/k8s_cron_job.yaml index 97928ff..8930c9d 100644 --- a/examples/k8s_cron_job.yaml +++ b/examples/k8s_cron_job.yaml @@ -54,6 +54,7 @@ spec: runAsUser: 1000 terminationMessagePath: /dev/termination-log terminationMessagePolicy: File + command: ["/app/.venv/bin/python3"] # Update this is if the ENTRYPOINT changes dnsConfig: options: - name: single-request-reopen diff --git a/requirements-dev.txt b/requirements-dev.txt index 14c79dd..c46fa5c 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -7,6 +7,8 @@ pytz==2023.3.post1 six==1.16.0 tzdata==2023.4 pyarrow==14.0.1 +azure-storage-blob==12.19.1 +azure-identity==1.15.0 # The dependencies bellow are only used for development. freezegun==1.4.0 pylint==3.0.3 diff --git a/requirements.txt b/requirements.txt index 63cc0fc..ac83fbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,5 @@ pytz==2023.3.post1 six==1.16.0 tzdata==2023.4 pyarrow==14.0.1 +azure-storage-blob==12.19.1 +azure-identity==1.15.0 \ No newline at end of file diff --git a/src/data_types.json b/src/data_types.json new file mode 100644 index 0000000..cf701b7 --- /dev/null +++ b/src/data_types.json @@ -0,0 +1,39 @@ +{ + "cpuCoreHours": "float", + "cpuCoreRequestAverage": "float", + "cpuCoreUsageAverage": "float", + "cpuCores": "float", + "cpuCost": "float", + "cpuCostAdjustment": "float", + "cpuEfficiency": "float", + "externalCost": "float", + "gpuCost": "float", + "gpuCostAdjustment": "float", + "gpuCount": "float", + "gpuHours": "float", + "loadBalancerCost": "float", + "loadBalancerCostAdjustment": "float", + "networkCost": "float", + "networkCostAdjustment": "float", + "networkCrossRegionCost": "float", + "networkCrossZoneCost": "float", + "networkInternetCost": "float", + "networkReceiveBytes": "float", + "networkTransferBytes": "float", + "pvByteHours": "float", + "pvBytes": "float", + "pvCost": "float", + "pvCostAdjustment": "float", + "ramByteHours": "float", + "ramByteRequestAverage": "float", + "ramByteUsageAverage": "float", + "ramBytes": "float", + "ramCost": "float", + "ramCostAdjustment": "float", + "ramEfficiency": "float", + "running_minutes": "float", + "sharedCost": "float", + "totalCost": "float", + "totalEfficiency": "float" +} + \ No newline at end of file diff --git a/src/ignore_alloc_keys.json b/src/ignore_alloc_keys.json new file mode 100644 index 0000000..9821387 --- /dev/null +++ b/src/ignore_alloc_keys.json @@ -0,0 +1,3 @@ +{ + "keys": ["pvs", "lbAllocations"] +} \ No newline at end of file diff --git a/src/opencost_parquet_exporter.py b/src/opencost_parquet_exporter.py index d118783..9310a55 100644 --- a/src/opencost_parquet_exporter.py +++ b/src/opencost_parquet_exporter.py @@ -1,14 +1,33 @@ -"""OpenCost parquet exporter. +# pylint: disable=W0511 -This module exports data from OpenCost API to parquet format, making it suitable -for further analysis or storage in data warehouses. """ +This module provides an implementation of the OpenCost storage exporter. +""" + import sys from datetime import datetime, timedelta import os +import json import pandas as pd import requests -import botocore.exceptions as boto_exceptions +from storage_factory import get_storage + + +def load_config_file(file_path: str): + """ + Loads and returns the a JSON file specified by the file path. + + Parameters: + file_path (str): The path to the JSON configuration file. + + Returns: + dict: A dictionary containing the loaded JSON file. + """ + with open(file_path, 'r', encoding="utf-8") as file: + config = json.load(file) + return config + +# pylint: disable=R0912,R0913,R0914,R0915 def get_config( @@ -19,7 +38,13 @@ def get_config( s3_bucket=None, file_key_prefix=None, aggregate_by=None, - step=None): + step=None, + resolution=None, + accumulate=None, + storage_backend=None, + include_idle=None, + idle_by_node=None, +): """ Get configuration for the parquet exporter based on either provided parameters or environment variables. @@ -47,11 +72,26 @@ def get_config( or 'namespace,pod,container' if not set. - step (str): Granularity for the data aggregation, defaults to the 'OPENCOST_PARQUET_STEP' environment variable, - or '1h' if not set. + or is not used in query if not set. + - resolution (str): Granularity for the PromQL queries in opencost, + defaults to the 'OPENCOST_PARQUET_RESOLUTION' environment variable, + or is not used in query if not set. + - accumulate (str): Whether or not to accumulate aggregated cost, + defaults to the 'OPENCOST_PARQUET_ACCUMULATE' environment variable, + or is not used in query if not set. + - storage_backend (str): Backend of the storage service (aws or azure), + defaults to the 'OPENCOST_PARQUET_STORAGE_BACKEND' ENV var, + or 'aws' if not set. + - include_idle (str): Whether to return the calculated __idle__ field for the query, + defaults to the 'OPENCOST_PARQUET_INCLUDE_IDLE' environment + variable, or 'false' if not set. + - idle_by_node (str): If true, idle allocations are created on a per node basis, + defaults to the 'OPENCOST_PARQUET_IDLE_BY_NODE' environment + variable, or 'false' if not set. Returns: - dict: Configuration dictionary with keys for 'url', 'params', 's3_bucket', - 'file_key_prefix', 'data_types', 'ignored_alloc_keys', and 'rename_columns_config'. + 'file_key_prefix' and 'window_start'. """ config = {} @@ -69,16 +109,43 @@ def get_config( if s3_bucket is None: s3_bucket = os.environ.get('OPENCOST_PARQUET_S3_BUCKET', None) if file_key_prefix is None: - file_key_prefix = os.environ.get('OPENCOST_PARQUET_FILE_KEY_PREFIX', '/tmp/') + # TODO: Discuss: Format guideline? + file_key_prefix = os.environ.get( + 'OPENCOST_PARQUET_FILE_KEY_PREFIX', '/tmp/') if aggregate_by is None: - aggregate_by = os.environ.get('OPENCOST_PARQUET_AGGREGATE', 'namespace,pod,container') + aggregate_by = os.environ.get( + 'OPENCOST_PARQUET_AGGREGATE', 'namespace,pod,container') if step is None: step = os.environ.get('OPENCOST_PARQUET_STEP', '1h') + if resolution is None: + resolution = os.environ.get('OPENCOST_PARQUET_RESOLUTION', None) + if accumulate is None: + accumulate = os.environ.get('OPENCOST_PARQUET_ACCUMULATE', None) + if idle_by_node is None: + idle_by_node = os.environ.get('OPENCOST_PARQUET_IDLE_BY_NODE', 'false') + if include_idle is None: + include_idle = os.environ.get('OPENCOST_PARQUET_INCLUDE_IDLE', 'false') + if storage_backend is None: + storage_backend = os.environ.get( + 'OPENCOST_PARQUET_STORAGE_BACKEND', 'aws') # For backward compatibility if s3_bucket is not None: config['s3_bucket'] = s3_bucket + config['storage_backend'] = storage_backend config['url'] = f"http://{hostname}:{port}/allocation/compute" config['file_key_prefix'] = file_key_prefix + + # Azure-specific configuration + if config['storage_backend'] == 'azure': + config.update({ + # pylint: disable=C0301 + 'azure_storage_account_name': os.environ.get('OPENCOST_PARQUET_AZURE_STORAGE_ACCOUNT_NAME'), + 'azure_container_name': os.environ.get('OPENCOST_PARQUET_AZURE_CONTAINER_NAME'), + 'azure_tenant': os.environ.get('OPENCOST_PARQUET_AZURE_TENANT'), + 'azure_application_id': os.environ.get('OPENCOST_PARQUET_AZURE_APPLICATION_ID'), + 'azure_application_secret': os.environ.get('OPENCOST_PARQUET_AZURE_APPLICATION_SECRET'), + }) + # If window is not specified assume we want yesterday data. if window_start is None or window_end is None: yesterday = datetime.strftime( @@ -86,72 +153,26 @@ def get_config( window_start = yesterday+'T00:00:00Z' window_end = yesterday+'T23:59:59Z' window = f"{window_start},{window_end}" - config['params'] = ( + config['window_start'] = window_start + config['params'] = [ ("window", window), - ("aggregate", aggregate_by), - ("includeIdle", "false"), - ("idleByNode", "false"), + ("includeIdle", include_idle), + ("idleByNode", idle_by_node), ("includeProportionalAssetResourceCosts", "false"), - ("format", "json"), - ("step", step) - ) - # This is required to ensure consistency without this - # we could have type change from int to float over time - # And this will result in an HIVE PARTITION SCHEMA MISMATCH - config['data_types'] = { - 'cpuCoreHours': 'float', - 'cpuCoreRequestAverage': 'float', - 'cpuCoreUsageAverage': 'float', - 'cpuCores': 'float', - 'cpuCost': 'float', - 'cpuCostAdjustment': 'float', - 'cpuEfficiency': 'float', - 'externalCost': 'float', - 'gpuCost': 'float', - 'gpuCostAdjustment': 'float', - 'gpuCount': 'float', - 'gpuHours': 'float', - 'loadBalancerCost': 'float', - 'loadBalancerCostAdjustment': 'float', - 'networkCost': 'float', - 'networkCostAdjustment': 'float', - 'networkCrossRegionCost': 'float', - 'networkCrossZoneCost': 'float', - 'networkInternetCost': 'float', - 'networkReceiveBytes': 'float', - 'networkTransferBytes': 'float', - 'pvByteHours': 'float', - 'pvBytes': 'float', - 'pvCost': 'float', - 'pvCostAdjustment': 'float', - 'ramByteHours': 'float', - 'ramByteRequestAverage': 'float', - 'ramByteUsageAverage': 'float', - 'ramBytes': 'float', - 'ramCost': 'float', - 'ramCostAdjustment': 'float', - 'ramEfficiency': 'float', - 'running_minutes': 'float', - 'sharedCost': 'float', - 'totalCost': 'float', - 'totalEfficiency': 'float' - } - config['ignored_alloc_keys'] = ['pvs', 'lbAllocations'] - config['rename_columns_config'] = { - 'start': 'running_start_time', - 'end': 'running_end_time', - 'minutes': 'running_minutes', - 'properties.labels.node_type': 'label.node_type', - 'properties.labels.product': 'label.product', - 'properties.labels.project': 'label.project', - 'properties.labels.role': 'label.role', - 'properties.labels.team': 'label.team', - 'properties.namespaceLabels.product': 'namespaceLabels.product', - 'properties.namespaceLabels.project': 'namespaceLabels.project', - 'properties.namespaceLabels.role': 'namespaceLabels.role', - 'properties.namespaceLabels.team': 'namespaceLabels.team' - } - config['window_start'] = window_start + ("format", "json") + ] + + # Conditionally append query parameters + if step is not None: + config['params'].append(("step", step)) + if aggregate_by is not None: + config['params'].append(("aggregate", aggregate_by)) + if resolution is not None: + config['params'].append(("resolution", resolution)) + if accumulate is not None: + config['params'].append(("accumulate", accumulate)) + config['params'] = tuple(config['params']) + return config @@ -173,7 +194,7 @@ def request_data(config): params=params, # 15 seconds connect timeout # No read timeout, in case it takes a long - timeout=(15,None) + timeout=(15, None) ) response.raise_for_status() if 'application/json' in response.headers['content-type']: @@ -187,13 +208,14 @@ def request_data(config): return None -def process_result(result, config): +def process_result(result, ignored_alloc_keys, rename_cols, data_types): """ Process raw results from the OpenCost API data request. - Parameters: - result (dict): Raw response data from the OpenCost API. - - config (dict): Configuration dictionary with data types and other processing options. + - ignored_alloc_keys (dict): Allocation keys to ignore + - rename_cols (dict): Key-value pairs for coloumns to rename + - data_types (dict): Data types for properties of OpenCost response Returns: - DataFrame or None: Processed data as a Pandas DataFrame, or None if an error occurs. @@ -204,14 +226,17 @@ def process_result(result, config): split.pop('__unmounted__/__unmounted__/__unmounted__', None) for split in result: for alloc_name in split.keys(): - for ignored_key in config['ignored_alloc_keys']: + for ignored_key in ignored_alloc_keys: split[alloc_name].pop(ignored_key, None) try: - frames = [pd.json_normalize(split.values()) for split in result] + frames = [ + pd.json_normalize( + split.values(), + sep=os.environ.get('OPENCOST_PARQUET_JSON_SEPARATOR', '.')) + for split in result] processed_data = pd.concat(frames) - processed_data.rename( - columns=config['rename_columns_config'], inplace=True) - processed_data = processed_data.astype(config['data_types']) + processed_data.rename(columns=rename_cols, inplace=True) + processed_data = processed_data.astype(data_types) except pd.errors.EmptyDataError as err: print(f"No data: {err}") return None @@ -243,63 +268,53 @@ def save_result(processed_result, config): Returns: - uri : String with the path where the data was saved. """ - file_name = 'k8s_opencost.parquet' - window = datetime.strptime(config['window_start'], "%Y-%m-%dT%H:%M:%SZ") - parquet_prefix = f"{config['file_key_prefix']}/year={window.year}"\ - f"/month={window.month}/day={window.day}" - try: - if config.get('s3_bucket', None): - uri = f"s3://{config['s3_bucket']}/{parquet_prefix}/{file_name}" - else: - uri = f"file://{parquet_prefix}/{file_name}" - path = '/'+parquet_prefix - os.makedirs(path, 0o750, exist_ok=True) - processed_result.to_parquet(uri) - return uri - except pd.errors.EmptyDataError as ede: - print(f"Error: No data to save, the DataFrame is empty.{ede}") - except KeyError as ke: - print(f"Missing configuration key: {ke}") - except ValueError as ve: - print(f"Error parsing date format: {ve}") - except FileNotFoundError as fnfe: - print(f"File or directory not found: {fnfe}") - except PermissionError as pe: - print(f"Permission error: {pe}") - except boto_exceptions.NoCredentialsError: - print("Error: No AWS credentials found to access S3") - except boto_exceptions.PartialCredentialsError: - print("Error: Incomplete AWS credentials provided for accessing S3") - except boto_exceptions.ClientError as ce: - print(f"AWS Client Error: {ce}") - return None + # TODO: Handle save to local file system. Make it default maybe? + storage = get_storage(storage_backend=config['storage_backend']) + uri = storage.save_data(data=processed_result, config=config) + if uri: + print(f"Data successfully saved at: {uri}") + else: + print("Failed to save data.") + sys.exit(1) + +# pylint: disable=C0116 + def main(): - """ - Main function to execute the workflow of fetching, processing, and saving data - for yesterday. - """ + # TODO: Error handling when load fails print("Starting run") + print("Load data types") + data_types = load_config_file( + file_path='./src/data_types.json') # TODO: Make path ENV var + print("Load renaming coloumns") + rename_cols = load_config_file( + file_path='./src/rename_cols.json') # TODO: Make path ENV var + print("Load allocation keys to ignore") + ignore_alloc_keys = load_config_file( + file_path='./src/ignore_alloc_keys.json') # TODO: Make path ENV var + + print("Build config") config = get_config() - print(config) print("Retrieving data from opencost api") - result = request_data(config) + result = request_data(config=config) if result is None: print("Result is None. Aborting execution") sys.exit(1) print("Opencost data retrieved successfully") + print("Processing the data") - processed_data = process_result(result, config) + processed_data = process_result( + result=result, + ignored_alloc_keys=ignore_alloc_keys, + rename_cols=rename_cols, + data_types=data_types) if processed_data is None: print("Processed data is None, aborting execution.") sys.exit(1) print("Data processed successfully") + print("Saving data") - saved_path = save_result(processed_data, config) - if saved_path is None: - print("Error while saving the data.") - sys.exit(1) - print(f"Success saving data at: {saved_path}") + save_result(processed_data, config) if __name__ == "__main__": diff --git a/src/rename_cols.json b/src/rename_cols.json new file mode 100644 index 0000000..86331df --- /dev/null +++ b/src/rename_cols.json @@ -0,0 +1,14 @@ +{ + "start": "running_start_time", + "end": "running_end_time", + "minutes": "running_minutes", + "properties.labels.node_type": "label.node_type", + "properties.labels.product": "label.product", + "properties.labels.project": "label.project", + "properties.labels.role": "label.role", + "properties.labels.team": "label.team", + "properties.namespaceLabels.product": "namespaceLabels.product", + "properties.namespaceLabels.project": "namespaceLabels.project", + "properties.namespaceLabels.role": "namespaceLabels.role", + "properties.namespaceLabels.team": "namespaceLabels.team" +} \ No newline at end of file diff --git a/src/storage/__init__.py b/src/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/storage/aws_s3_storage.py b/src/storage/aws_s3_storage.py new file mode 100644 index 0000000..d397034 --- /dev/null +++ b/src/storage/aws_s3_storage.py @@ -0,0 +1,67 @@ +""" +This module provides an implementation of the BaseStorage class for Amazon S3. +""" + +import os +import pandas as pd +from botocore.exceptions import ClientError, PartialCredentialsError, NoCredentialsError +from .base_storage import BaseStorage + +# pylint: disable=R0903 + + +class S3Storage(BaseStorage): + """ + A class that extends the BaseStorage abstract class to provide functionality + for saving data to Amazon S3. + + """ + + def save_data(self, data, config) -> str | None: + """ + Uploads the provided data to an Amazon S3 bucket using the specified configuration. + + Parameters: + data (DataFrame): The data to be uploaded. + Should be a file-like object (BytesIO, for example). + config (dict): Configuration information including the S3 bucket name, object key + prefix,and the 'window_start' datetime that influences the object's + key structure. + + Returns: + str | None: The full S3 object path if the upload is successful, None otherwise. + + """ + file_name = 'k8s_opencost.parquet' + window = pd.to_datetime(config['window_start']) + # pylint: disable=C0301 + parquet_prefix = f"{config['file_key_prefix']}/year={window.year}/month={window.month}/day={window.day}" + + try: + if config['s3_bucket']: + uri = f"s3://{config['s3_bucket']}/{parquet_prefix}/{file_name}" + else: + uri = f"file://{parquet_prefix}/{file_name}" + path = '/'+parquet_prefix + os.makedirs(path, 0o750, exist_ok=True) + + data.to_parquet(uri) + + return uri + except pd.errors.EmptyDataError as ede: + print(f"Error: No data to save, the DataFrame is empty.{ede}") + except KeyError as ke: + print(f"Missing configuration key: {ke}") + except ValueError as ve: + print(f"Error parsing date format: {ve}") + except FileNotFoundError as fnfe: + print(f"File or directory not found: {fnfe}") + except PermissionError as pe: + print(f"Permission error: {pe}") + except NoCredentialsError: + print("Error: No AWS credentials found to access S3") + except PartialCredentialsError: + print("Error: Incomplete AWS credentials provided for accessing S3") + except ClientError as ce: + print(f"AWS Client Error: {ce}") + return None diff --git a/src/storage/azure_storage.py b/src/storage/azure_storage.py new file mode 100644 index 0000000..3375e0b --- /dev/null +++ b/src/storage/azure_storage.py @@ -0,0 +1,76 @@ +# pylint: disable=W0511 + +""" +This module provides an implementation of the BaseStorage class for Azure Blob Storage +with authentication via client secret credentials. +""" + +from io import BytesIO +import logging +import sys +import pandas as pd +from azure.identity import ClientSecretCredential +from azure.storage.blob import BlobServiceClient, BlobType +from .base_storage import BaseStorage + +logger = logging.getLogger('azure.storage.blob') +logger.setLevel(logging.INFO) # TODO: Make ENV var +handler = logging.StreamHandler(stream=sys.stdout) +logger.addHandler(handler) + + +# pylint: disable=R0903 +class AzureStorage(BaseStorage): + """ + A class to handle data storage in Azure Blob Storage. + + """ + + def save_data(self, data: pd.core.frame.DataFrame, config) -> str | None: + """ + Saves a DataFrame to Azure Blob Storage. + + Parameters: + data (pd.core.frame.DataFrame): The DataFrame to be saved. + config (dict): Configuration dictionary containing necessary information for storage. + Expected keys include 'azure_tenant', 'azure_application_id', + 'azure_application_secret', 'azure_storage_account_name', + 'azure_container_name', and 'file_key_prefix'. + + Returns: + str | None: The URL of the saved blob if successful, None otherwise. + + """ + credentials = ClientSecretCredential( + config['azure_tenant'], + config['azure_application_id'], + config['azure_application_secret'] + ) + blob_service_client = BlobServiceClient( + f"https://{config['azure_storage_account_name']}.blob.core.windows.net", + logging_enable=True, + credential=credentials + ) + + # TODO: Force overwrite? As of now upload would fail since key is the same. + # blob_client provides an option for this + file_name = 'k8s_opencost.parquet' + window = pd.to_datetime(config['window_start']) + parquet_prefix = f"{config['file_key_prefix']}{window.year}/{window.month}/{window.day}" + key = f"{parquet_prefix}/{file_name}" + blob_client = blob_service_client.get_blob_client( + container=config['azure_container_name'], blob=key) + parquet_file = BytesIO() + data.to_parquet(parquet_file, engine='pyarrow', index=False) + parquet_file.seek(0) + + try: + response = blob_client.upload_blob( + data=parquet_file, blob_type=BlobType.BlockBlob) + if response: + return f"{blob_client.url}" + # pylint: disable=W0718 + except Exception as e: + logger.error(e) + + return None diff --git a/src/storage/base_storage.py b/src/storage/base_storage.py new file mode 100644 index 0000000..b9eba94 --- /dev/null +++ b/src/storage/base_storage.py @@ -0,0 +1,30 @@ +""" +This module defines an abstract base class for storage mechanisms. +It provides a standardized interface for saving data, allowing for implementation +of various storage backends such as file systems, databases, or cloud storage solutions. +""" + +from abc import ABC, abstractmethod + + +# pylint: disable=R0903 +class BaseStorage(ABC): + """ + An abstract base class that represents a generic storage mechanism. + + This class is designed to be subclassed by specific storage implementations, + providing a consistent interface for saving data across various storage backends. + """ + + @abstractmethod + def save_data(self, data, config): + """ + Abstract method to save data using the provided configuration. + + This method must be implemented by subclasses to handle the actual + data storage logic according to the specific backend's requirements. + + Parameters: + data: The data to be saved. + config: Configuration settings for the storage operation. + """ diff --git a/src/storage_factory.py b/src/storage_factory.py new file mode 100644 index 0000000..a7eaa27 --- /dev/null +++ b/src/storage_factory.py @@ -0,0 +1,31 @@ +""" +This module provides a factory function for creating storage objects based on +the specified backend. +""" + +from storage.aws_s3_storage import S3Storage +from storage.azure_storage import AzureStorage + + +def get_storage(storage_backend): + """ + Factory function to create and return a storage object based on the given backend. + + This function abstracts the creation of storage objectss. It supports 'azure' for + Azure Storage and 's3' for AWS S3 Storage. + + Parameters: + storage_backend (str): The name of the storage backend. SUpported:'azure','s3'. + + Returns: + An instance of the specified storage backend class. + + Raises: + ValueError: If the specified storage backend is not supported. + """ + if storage_backend == 'azure': + return AzureStorage() + if storage_backend == 's3': + return S3Storage() + + raise ValueError("Unsupported storage backend") diff --git a/src/test_opencost_parquet_exporter.py b/src/test_opencost_parquet_exporter.py index d0bc53c..b1d6294 100644 --- a/src/test_opencost_parquet_exporter.py +++ b/src/test_opencost_parquet_exporter.py @@ -1,14 +1,17 @@ """ Test cases for opencost-parquet-exporter.""" import unittest -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, mock_open +import json import os import requests from freezegun import freeze_time -from opencost_parquet_exporter import get_config, request_data +from opencost_parquet_exporter import get_config, request_data, load_config_file + class TestGetConfig(unittest.TestCase): """Test cases for get_config method""" - def test_get_config_with_env_vars(self): + + def test_get_aws_config_with_env_vars(self): """Test get_config returns correct configurations based on environment variables.""" with patch.dict(os.environ, { 'OPENCOST_PARQUET_SVC_HOSTNAME': 'testhost', @@ -18,12 +21,53 @@ def test_get_config_with_env_vars(self): 'OPENCOST_PARQUET_S3_BUCKET': 's3://test-bucket', 'OPENCOST_PARQUET_FILE_KEY_PREFIX': 'test-prefix/', 'OPENCOST_PARQUET_AGGREGATE': 'namespace', - 'OPENCOST_PARQUET_STEP': '1m'}, clear=True): + 'OPENCOST_PARQUET_STEP': '1m', + 'OPENCOST_PARQUET_STORAGE_BACKEND': 'aws'}, clear=True): config = get_config() - print(config) - self.assertEqual(config['url'], 'http://testhost:8080/allocation/compute') - self.assertEqual(config['params'][0][1], '2020-01-01T00:00:00Z,2020-01-01T23:59:59Z') + + self.assertEqual( + config['url'], 'http://testhost:8080/allocation/compute') + self.assertEqual(config['params'][0][1], + '2020-01-01T00:00:00Z,2020-01-01T23:59:59Z') self.assertEqual(config['s3_bucket'], 's3://test-bucket') + self.assertEqual(config['storage_backend'], 'aws') + self.assertEqual(config['params'][1][1], 'false') + self.assertEqual(config['params'][2][1], 'false') + + def test_get_azure_config_with_env_vars(self): + """Test get_config returns correct configurations based on environment variables.""" + with patch.dict(os.environ, { + 'OPENCOST_PARQUET_SVC_HOSTNAME': 'testhost', + 'OPENCOST_PARQUET_SVC_PORT': '8080', + 'OPENCOST_PARQUET_WINDOW_START': '2020-01-01T00:00:00Z', + 'OPENCOST_PARQUET_WINDOW_END': '2020-01-01T23:59:59Z', + 'OPENCOST_PARQUET_S3_BUCKET': 's3://test-bucket', + 'OPENCOST_PARQUET_FILE_KEY_PREFIX': 'test-prefix/', + 'OPENCOST_PARQUET_AGGREGATE': 'namespace', + 'OPENCOST_PARQUET_STEP': '1m', + 'OPENCOST_PARQUET_STORAGE_BACKEND': 'azure', + 'OPENCOST_PARQUET_AZURE_STORAGE_ACCOUNT_NAME': 'testaccount', + 'OPENCOST_PARQUET_AZURE_CONTAINER_NAME': 'testcontainer', + 'OPENCOST_PARQUET_AZURE_TENANT': 'testtenant', + 'OPENCOST_PARQUET_AZURE_APPLICATION_ID': 'testid', + 'OPENCOST_PARQUET_AZURE_APPLICATION_SECRET': 'testsecret', + 'OPENCOST_PARQUET_IDLE_BY_NODE': 'true', + 'OPENCOST_PARQUET_INCLUDE_IDLE': 'true'}, clear=True): + config = get_config() + + self.assertEqual( + config['url'], 'http://testhost:8080/allocation/compute') + self.assertEqual(config['params'][0][1], + '2020-01-01T00:00:00Z,2020-01-01T23:59:59Z') + self.assertEqual(config['storage_backend'], 'azure') + self.assertEqual( + config['azure_storage_account_name'], 'testaccount') + self.assertEqual(config['azure_container_name'], 'testcontainer') + self.assertEqual(config['azure_tenant'], 'testtenant') + self.assertEqual(config['azure_application_id'], 'testid') + self.assertEqual(config['azure_application_secret'], 'testsecret') + self.assertEqual(config['params'][1][1], 'true') + self.assertEqual(config['params'][2][1], 'true') @freeze_time("2024-01-31") def test_get_config_defaults_last_day_of_month(self): @@ -35,7 +79,8 @@ def test_get_config_defaults_last_day_of_month(self): window = f"{window_start},{window_end}" config = get_config() - self.assertEqual(config['url'], 'http://localhost:9003/allocation/compute') + self.assertEqual( + config['url'], 'http://localhost:9003/allocation/compute') self.assertTrue(config['file_key_prefix'], '/tmp/') self.assertNotIn('s3_bucket', config) self.assertEqual(config['params'][0][1], window) @@ -50,7 +95,8 @@ def test_get_config_defaults_first_day_of_month(self): window = f"{window_start},{window_end}" config = get_config() - self.assertEqual(config['url'], 'http://localhost:9003/allocation/compute') + self.assertEqual( + config['url'], 'http://localhost:9003/allocation/compute') self.assertTrue(config['file_key_prefix'], '/tmp/') self.assertNotIn('s3_bucket', config) self.assertEqual(config['params'][0][1], window) @@ -68,10 +114,12 @@ def test_get_config_no_window_start(self): window = f"{window_start},{window_end}" config = get_config() - self.assertEqual(config['url'], 'http://localhost:9003/allocation/compute') + self.assertEqual( + config['url'], 'http://localhost:9003/allocation/compute') self.assertTrue(config['file_key_prefix'], '/tmp/') self.assertNotIn('s3_bucket', config) self.assertEqual(config['params'][0][1], window) + @freeze_time('2024-12-20') def test_get_config_no_window_end(self): """Test get_config returns correct defaults when window end @@ -85,11 +133,13 @@ def test_get_config_no_window_end(self): window = f"{window_start},{window_end}" config = get_config() - self.assertEqual(config['url'], 'http://localhost:9003/allocation/compute') + self.assertEqual( + config['url'], 'http://localhost:9003/allocation/compute') self.assertTrue(config['file_key_prefix'], '/tmp/') self.assertNotIn('s3_bucket', config) self.assertEqual(config['params'][0][1], window) + class TestRequestData(unittest.TestCase): """ Test request_data method """ @patch('opencost_parquet_exporter.requests.get') @@ -143,5 +193,64 @@ def test_request_data_failure(self, mock_get): self.assertIsNone(data) +class TestLoadConfigMaps(unittest.TestCase): + """Test cases for load_config_file method""" + + def setUp(self): + # Setup a temporary directory and example config data for testing + self.test_dir = 'test_configs' + os.makedirs(self.test_dir, exist_ok=True) + self.valid_json_path = os.path.join(self.test_dir, 'valid_config.json') + self.invalid_json_path = os.path.join( + self.test_dir, 'invalid_config.json') + self.empty_json_path = os.path.join(self.test_dir, 'empty.json') + + # Create a valid JSON file + with open(self.valid_json_path, 'w', encoding='utf-8') as f: + json.dump({"key": "value"}, f) + + # Create an invalid JSON file + with open(self.invalid_json_path, 'w', encoding='utf-8') as f: + f.write('{"key": "value",}') + + # Create an empty JSON file + with open(self.empty_json_path, 'w', encoding='utf-8') as f: + pass + + def tearDown(self): + # Remove the directory after tests + for file in os.listdir(self.test_dir): + os.remove(os.path.join(self.test_dir, file)) + os.rmdir(self.test_dir) + + def test_successful_load(self): + """ Test loading a valid JSON file """ + result = load_config_file(self.valid_json_path) + self.assertEqual(result, {"key": "value"}) + + def test_file_not_found(self): + """ Test the response when the JSON file does not exist """ + with self.assertRaises(FileNotFoundError): + load_config_file('nonexistent.json') + + def test_permission_error(self): + """ Test the response to inadequate file permissions """ + # Simulate permission error by patching os.open + with patch('builtins.open', mock_open()) as mocked_file: + mocked_file.side_effect = PermissionError("Permission denied") + with self.assertRaises(PermissionError): + load_config_file(self.valid_json_path) + + def test_invalid_json_format(self): + """ Test how it handles a file with invalid JSON """ + with self.assertRaises(json.JSONDecodeError): + load_config_file(self.invalid_json_path) + + def test_empty_file(self): + """ Test the function's response to an empty JSON file """ + with self.assertRaises(json.JSONDecodeError): + load_config_file(self.empty_json_path) + + if __name__ == '__main__': unittest.main()