Skip to content

Commit

Permalink
Improving celery tasks handling
Browse files Browse the repository at this point in the history
  • Loading branch information
marifergun committed Mar 29, 2024
1 parent f45042c commit 5326131
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 33 deletions.
4 changes: 2 additions & 2 deletions cosapweb/api/helpers/file_helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import os

from django.conf import settings
import warnings


def wait_file_update_complete(file_path: str, timeout: int = 600) -> bool:
def wait_file_update_complete(file_path: str, timeout: int = 1000) -> bool:
"""
Waits for file to be updated.
"""
Expand Down
20 changes: 16 additions & 4 deletions cosapweb/api/helpers/project_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .file_helpers import wait_file_update_complete
from .user_helpers import get_user_dir
from .variant_helpers import create_snv
import warnings


def set_project_status(project_id, status):
Expand All @@ -27,9 +28,20 @@ def set_project_status(project_id, status):
status=status,
)

def set_project_stderr(project_id, stderr):
"""
Sets project stderr.
"""

Project.objects.filter(id=project_id).update(
stderr=stderr,
)

def get_project_dir(project_id):
project = Project.objects.get(id=project_id)
try:
project = Project.objects.get(id=project_id)
except Project.DoesNotExist:
raise Exception("Project does not exist.")
return os.path.join(get_user_dir(project.user), f"{project.id}_{project.name}")


Expand Down Expand Up @@ -148,10 +160,10 @@ def project_files_ready(project_id):
try:
project_files = ProjectFiles.objects.get(project=project)
except ProjectFiles.DoesNotExist:
warnings.warn("The project does not have any files.")
return False

for file in project_files.files.all():
return wait_file_update_complete(file.file.path)

return all([wait_file_update_complete(file.file.path) for file in project_files.files.all()])


def update_project_summary(project_id, qc_results=None, msi_score=None):
Expand Down
23 changes: 17 additions & 6 deletions cosapweb/api/helpers/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from ..celery_handlers.tasks import (cosap_annotation_task, cosap_dna_task,
cosap_parse_project_data_task)
from ..constants import (CosapDnaTaskInputs, FileExtensions, ProjectTypes,
Sampletypes)
Sampletypes, ProjectStatus)
from ..models import Project, ProjectFiles
from .project_helpers import (get_project_algorithms, get_project_dir,
get_project_files, get_project_type)
get_project_files, get_project_type, set_project_status, set_project_stderr)


def submit_cosap_dna_task(project_id: int):
Expand All @@ -19,10 +19,19 @@ def submit_cosap_dna_task(project_id: int):
project_id, file_type=FileExtensions.BED.value[0]
).first()

normal_pairs = (
match_read_pairs([file for file in normal_files])[0] if normal_files else None
)
tumor_pairs = match_read_pairs([file for file in tumor_files])

try:
normal_pairs = (
match_read_pairs([file for file in normal_files])[0] if normal_files else None
)
except Exception as e:
set_project_stderr(project_id, e)

try:
tumor_pairs = match_read_pairs([file for file in tumor_files])
except Exception as e:
set_project_stderr(project_id, e)


algorithms = get_project_algorithms(project_id)
workdir = get_project_dir(project_id)
Expand All @@ -47,6 +56,8 @@ def submit_cosap_dna_task(project_id: int):

results = cosap_dna_task(project_id, **dna_task_input)

# Set project status to running
set_project_status(project_id, ProjectStatus.RUNNING.value)
return results


Expand Down
18 changes: 18 additions & 0 deletions cosapweb/api/management/commands/retry_failed_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from django.core.management.base import BaseCommand
from ...models import Project
from ...constants import ProjectStatus


class Command(BaseCommand):
help = "Retry failed jobs."

def handle(self, *args, **kwargs):
self.stdout.write("Retrying failed jobs...")

failed_projects = Project.objects.filter(status=ProjectStatus.FAILED.value)
for project in failed_projects:
project.status = ProjectStatus.PENDING.value
project.save()
self.stdout.write(f"Retrying job for project {project.id}...")

self.stdout.write("Done.")
6 changes: 4 additions & 2 deletions cosapweb/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class CustomUser(AbstractUser):

class Project(models.Model):

SOMATIC = "SM"
GERMLINE = "GM"
SOMATIC = "SOMATIC"
GERMLINE = "GERMLINE"
PROJECT_TYPE_CHOICES = [(SOMATIC, "somatic"), (GERMLINE, "germline")]

COMPLETED = "COMPLETED"
Expand Down Expand Up @@ -84,6 +84,7 @@ class Project(models.Model):
stderr = models.TextField(null=True, blank=True)
algorithms = models.JSONField(default=dict)
is_demo = models.BooleanField(default=False)
is_draft = models.BooleanField(default=False)

def __str__(self):
return f"{self.id} - {self.name}"
Expand Down Expand Up @@ -208,6 +209,7 @@ class File(models.Model):
)
file = models.FileField(upload_to=user_directory_path)
is_demo = models.BooleanField(default=False)
is_draft = models.BooleanField(default=False)

def __str__(self):
return f"{self.id}-{self.name}"
Expand Down
36 changes: 21 additions & 15 deletions cosapweb/api/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import PurePosixPath

from django.conf import settings
from django.db.models.signals import post_delete, post_save
from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from django_drf_filepond.models import TemporaryUpload, TemporaryUploadChunked
from rest_framework.authtoken.models import Token
Expand Down Expand Up @@ -38,13 +38,13 @@ def auto_delete_file_on_delete(sender, instance, **kwargs):
os.remove(instance.file.path)


@receiver(post_delete, sender=Project)
@receiver(pre_delete, sender=Project)
def auto_delete_project_dir_on_delete(sender, instance, **kwargs):
"""
Deletes project directory from filesystem
when corresponding `Project` object is deleted.
"""
project_dir = get_project_dir(instance)
project_dir = get_project_dir(instance.id)
if os.path.isdir(project_dir):
shutil.rmtree(project_dir)

Expand Down Expand Up @@ -116,19 +116,9 @@ def save_tmp_upload(sender, instance, **kwargs):

fl.name = upload_file_name
fl.file = permanent_file_path
fl.is_draft = False
fl.save()


# Submit COSAP DNA job when project is created and parse project data when project is updated.
@receiver(post_save, sender=Project)
def submit_cosap_dna_job(sender, instance, created, **kwargs):

# Submit COSAP DNA job when project is created
if created:
if project_files_ready(instance.id):
submit_cosap_dna_task(instance.id)


@receiver(post_save, sender=Project)
def create_project_dir(sender, instance, created, **kwargs):
"""
Expand All @@ -148,6 +138,20 @@ def create_project_summary(sender, instance, created, **kwargs):
from .models import ProjectSummary
ProjectSummary.objects.create(project=instance)


# Submit COSAP DNA job when project is created and parse project data when project is updated.
@receiver(post_save, sender=Project)
def submit_cosap_dna_job(sender, instance, created, **kwargs):
# Submit COSAP DNA job when project is created

if instance.is_draft:
return

if created:
if project_files_ready(instance.id):
submit_cosap_dna_task(instance.id)


@receiver(post_save, sender=Project)
def submit_cosap_parse_project_data_job(sender, instance, created, **kwargs):
"""
Expand All @@ -156,4 +160,6 @@ def submit_cosap_parse_project_data_job(sender, instance, created, **kwargs):
if not created:
updated_fields = get_updated_model_fields(Project, instance, ["status"])
if "status" in updated_fields and instance.status == ProjectStatus.PARSING.value:
subbmit_cosap_parse_project_data(instance.id)
subbmit_cosap_parse_project_data(instance.id)
elif "status" in updated_fields and instance.status == ProjectStatus.PENDING.value:
submit_cosap_dna_task(instance.id)
29 changes: 25 additions & 4 deletions cosapweb/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from ..common.utils import (convert_file_relative_path_to_absolute_path,
create_chonky_filemap)
from .helpers.project_helpers import get_project_dir
from .constants import ProjectStatus

USER = get_user_model()

Expand Down Expand Up @@ -180,7 +181,8 @@ def create(self, request, *args, **kwargs):
project_type=project_type,
name=name,
algorithms=algorithms,
status="PENDING",
status=ProjectStatus.PENDING.value,
is_draft=True,
)

normal_file_ids = json.loads(request.POST.get("normal_files", "[]"))
Expand All @@ -203,7 +205,9 @@ def create(self, request, *args, **kwargs):

project_files.save()

new_project.is_draft = False
new_project.save()

return HttpResponse(status=status.HTTP_201_CREATED)

def retrieve(self, request, pk):
Expand Down Expand Up @@ -238,7 +242,12 @@ def rerun_project(self, request, pk=None):
return HttpResponse(status=status.HTTP_401_UNAUTHORIZED)

project = Project.objects.get(id=pk)
project.status = "PENDING"

# If the project is already running, skip rerunning
if project.status == ProjectStatus.RUNNING.value:
return HttpResponse(status=status.HTTP_200_OK)

project.status = ProjectStatus.PENDING.value
project.save()

return HttpResponse(status=status.HTTP_200_OK)
Expand All @@ -251,8 +260,10 @@ def delete_project(self, request, pk=None):
and not request.user.is_superuser
):
return HttpResponse(status=status.HTTP_401_UNAUTHORIZED)
project = Project.objects.get(id=pk)

project = get_object_or_404(Project, pk=pk)
project.delete()

return HttpResponse(status=status.HTTP_200_OK)


Expand All @@ -270,6 +281,16 @@ def retrieve(self, request, pk=None):
variant_dict["af"] = ProjectSNVData.objects.get(
project=project, snv=snv
).allele_frequency
variant_dict["ad"] = ProjectSNVData.objects.get(
project=project, snv=snv
).allele_depth

# Internal variant frequency is the number of projects that have the variant over the total number of projects
internal_freq = ProjectSNVData.objects.filter(
snv=snv
).count() / Project.objects.count()
variant_dict["user_case_frequency"] = f"{internal_freq:.2f}"

except Exception as e:
variant_dict["af"] = -1

Expand Down Expand Up @@ -418,7 +439,7 @@ def create(self, request, *args, **kwargs):
temp_id = response.data
sample_type = request.POST.get("sample_type")
f = File.objects.create(
user=request.user, uuid=temp_id, sample_type=sample_type
user=request.user, uuid=temp_id, sample_type=sample_type, is_draft=True
)
return response

Expand Down

0 comments on commit 5326131

Please sign in to comment.