Skip to content

Commit

Permalink
feat: add new command to launch export job
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 committed Nov 8, 2024
1 parent 282f875 commit 7a1dcc7
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 16 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/container-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ jobs:
strategy:
matrix:
env:
# FIXME name you envs, see reuse below
# it's also the name of the directory for the application
- ${{ startsWith(github.ref, 'refs/tags/v') && 'off-exports-org' || 'off-exports-net' }}
environment: ${{ matrix.env }}
concurrency: ${{ matrix.env }}
Expand Down
20 changes: 10 additions & 10 deletions openfoodfacts_exports/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typer
from openfoodfacts import Flavor

app = typer.Typer()

Expand All @@ -15,13 +16,12 @@ def run_scheduler():


@app.command()
def run_worker(
queues: list[str] = typer.Argument(..., help="Names of the queues to listen to"),
burst: bool = typer.Option(
False, help="Run in burst mode (quit after all work is done)"
),
):
"""Launch a worker."""
from openfoodfacts_exports.workers.main import run

run(queues=queues, burst=burst)
def launch_export(flavor: Flavor) -> None:
"""Launch an export job for a given flavor."""
from openfoodfacts.utils import get_logger

from openfoodfacts_exports.tasks import export_job

# configure root logger
get_logger()
export_job(flavor)
6 changes: 3 additions & 3 deletions openfoodfacts_exports/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from openfoodfacts.utils import get_logger
from sentry_sdk import capture_exception

from openfoodfacts_exports.tasks import download_dataset_job
from openfoodfacts_exports.tasks import export_job
from openfoodfacts_exports.utils import init_sentry
from openfoodfacts_exports.workers.queues import high_queue

Expand All @@ -25,7 +25,7 @@ def export_datasets() -> None:
logger.info("Downloading dataset...")

for flavor in (Flavor.off, Flavor.obf, Flavor.opf, Flavor.opff):
high_queue.enqueue(download_dataset_job, flavor, job_timeout="1h", result_ttl=0)
high_queue.enqueue(export_job, flavor, job_timeout="1h", result_ttl=0)


# The scheduler is responsible for scheduling periodic work such as DB dump
Expand All @@ -35,7 +35,7 @@ def run() -> None:
scheduler = BlockingScheduler(timezone=pytz.utc)
scheduler.add_executor(ThreadPoolExecutor(10))
scheduler.add_jobstore(MemoryJobStore())
scheduler.add_job(export_datasets, "cron", hour=16, minute=19, max_instances=1)
scheduler.add_job(export_datasets, "cron", hour=16, minute=0, max_instances=1)
scheduler.add_listener(exception_listener, EVENT_JOB_ERROR)
logger.info("Starting scheduler")
scheduler.start()
Expand Down
2 changes: 1 addition & 1 deletion openfoodfacts_exports/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from openfoodfacts_exports.workers.queues import high_queue


def download_dataset_job(flavor: Flavor) -> None:
def export_job(flavor: Flavor) -> None:
"""Download the JSONL dataset and launch exports through new rq jobs."""
dataset_path = get_dataset(
flavor=flavor, dataset_type=DatasetType.jsonl, force_download=True
Expand Down

0 comments on commit 7a1dcc7

Please sign in to comment.