From 7a1dcc769e11851749c9bff49ff58d7058999964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Bournhonesque?= Date: Fri, 8 Nov 2024 16:16:19 +0100 Subject: [PATCH] feat: add new command to launch export job --- .github/workflows/container-deploy.yml | 2 -- openfoodfacts_exports/main.py | 20 ++++++++++---------- openfoodfacts_exports/scheduler.py | 6 +++--- openfoodfacts_exports/tasks.py | 2 +- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/.github/workflows/container-deploy.yml b/.github/workflows/container-deploy.yml index 765e3ea..a8716ec 100644 --- a/.github/workflows/container-deploy.yml +++ b/.github/workflows/container-deploy.yml @@ -14,8 +14,6 @@ jobs: strategy: matrix: env: - # FIXME name you envs, see reuse below - # it's also the name of the directory for the application - ${{ startsWith(github.ref, 'refs/tags/v') && 'off-exports-org' || 'off-exports-net' }} environment: ${{ matrix.env }} concurrency: ${{ matrix.env }} diff --git a/openfoodfacts_exports/main.py b/openfoodfacts_exports/main.py index 3af18af..55fdf1f 100644 --- a/openfoodfacts_exports/main.py +++ b/openfoodfacts_exports/main.py @@ -1,4 +1,5 @@ import typer +from openfoodfacts import Flavor app = typer.Typer() @@ -15,13 +16,12 @@ def run_scheduler(): @app.command() -def run_worker( - queues: list[str] = typer.Argument(..., help="Names of the queues to listen to"), - burst: bool = typer.Option( - False, help="Run in burst mode (quit after all work is done)" - ), -): - """Launch a worker.""" - from openfoodfacts_exports.workers.main import run - - run(queues=queues, burst=burst) +def launch_export(flavor: Flavor) -> None: + """Launch an export job for a given flavor.""" + from openfoodfacts.utils import get_logger + + from openfoodfacts_exports.tasks import export_job + + # configure root logger + get_logger() + export_job(flavor) diff --git a/openfoodfacts_exports/scheduler.py b/openfoodfacts_exports/scheduler.py index b4ad2b1..32a4e09 100644 --- a/openfoodfacts_exports/scheduler.py +++ b/openfoodfacts_exports/scheduler.py @@ -7,7 +7,7 @@ from openfoodfacts.utils import get_logger from sentry_sdk import capture_exception -from openfoodfacts_exports.tasks import download_dataset_job +from openfoodfacts_exports.tasks import export_job from openfoodfacts_exports.utils import init_sentry from openfoodfacts_exports.workers.queues import high_queue @@ -25,7 +25,7 @@ def export_datasets() -> None: logger.info("Downloading dataset...") for flavor in (Flavor.off, Flavor.obf, Flavor.opf, Flavor.opff): - high_queue.enqueue(download_dataset_job, flavor, job_timeout="1h", result_ttl=0) + high_queue.enqueue(export_job, flavor, job_timeout="1h", result_ttl=0) # The scheduler is responsible for scheduling periodic work such as DB dump @@ -35,7 +35,7 @@ def run() -> None: scheduler = BlockingScheduler(timezone=pytz.utc) scheduler.add_executor(ThreadPoolExecutor(10)) scheduler.add_jobstore(MemoryJobStore()) - scheduler.add_job(export_datasets, "cron", hour=16, minute=19, max_instances=1) + scheduler.add_job(export_datasets, "cron", hour=16, minute=0, max_instances=1) scheduler.add_listener(exception_listener, EVENT_JOB_ERROR) logger.info("Starting scheduler") scheduler.start() diff --git a/openfoodfacts_exports/tasks.py b/openfoodfacts_exports/tasks.py index fb08f30..1b35076 100644 --- a/openfoodfacts_exports/tasks.py +++ b/openfoodfacts_exports/tasks.py @@ -9,7 +9,7 @@ from openfoodfacts_exports.workers.queues import high_queue -def download_dataset_job(flavor: Flavor) -> None: +def export_job(flavor: Flavor) -> None: """Download the JSONL dataset and launch exports through new rq jobs.""" dataset_path = get_dataset( flavor=flavor, dataset_type=DatasetType.jsonl, force_download=True