Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add basic Celery/FastAPI queue #47

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: '3.8'

services:

web:
build:
context: .
dockerfile: ./server/Dockerfile
ports:
- 8004:8000
command: uvicorn server.main:app --host 0.0.0.0 --reload
volumes:
- .:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- redis

worker:
build:
context: .
dockerfile: ./server/Dockerfile
command: celery worker --app=server.worker.celery --concurrency=10 --loglevel=info --logfile=logs/celery.log
volumes:
- .:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis

redis:
image: redis:6-alpine

dashboard:
build:
context: .
dockerfile: ./server/Dockerfile
command: flower --app=server.worker.celery --port=5555 --broker=redis://redis:6379/0
ports:
- 5556:5555
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- worker
24 changes: 0 additions & 24 deletions docs/server.py

This file was deleted.

12 changes: 10 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ jupyter-book==0.11.1
psutil==5.8.0
kaleido==0.2.1
nbconvert==5.6
flask==2.0.0
gunicorn==20.1.0
cadCAD_tools==0.0.1.4
tqdm==4.61.0
diskcache==5.2.1
pylint==2.8.3

# Server
aiofiles==0.6.0
celery==4.4.7
fastapi==0.64.0
flower==0.9.7
redis==3.5.3
requests==2.25.1
uvicorn==0.13.4
17 changes: 17 additions & 0 deletions server/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# pull official base image
FROM python:3.9.5-slim-buster

# set work directory
WORKDIR /usr/src/app

# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

# install dependencies
RUN pip install --upgrade pip
COPY ./requirements.txt .
RUN pip install -r requirements.txt

# copy project
COPY . .
8 changes: 8 additions & 0 deletions server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Server

See https://github.com/testdrivenio/fastapi-celery

```bash
docker compose up --build
curl -X POST http://localhost:8004/execute -H "Content-Type: application/json" --data '{}'
```
25 changes: 25 additions & 0 deletions server/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from celery.result import AsyncResult
from fastapi import Body, FastAPI
from fastapi.responses import JSONResponse

from server.worker import create_task


app = FastAPI()

@app.post("/execute", status_code=201)
def run_task(payload = Body(...)):
# task_type = payload["type"]
task = create_task.delay("experiment")
return JSONResponse({"task_id": task.id})


@app.get("/tasks/{task_id}")
def get_status(task_id):
task_result = AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JSONResponse(result)
25 changes: 25 additions & 0 deletions server/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import pandas as pd
from celery import Celery
from radcad import Backend

from experiments import default_experiment
from experiments.run import run


celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")


@celery.task(name="experiment")
def create_task(task_type):
experiment = default_experiment.experiment
experiment.engine.backend = Backend.SINGLE_PROCESS
experiment.engine.deepcopy = False
experiment.engine.drop_substeps = True

raw_result = run(experiment)
json_response = pd.DataFrame(raw_result).to_json()

return json_response