Skip to content

Commit

Permalink
Initial tests
Browse files Browse the repository at this point in the history
  • Loading branch information
janbuchar committed Feb 21, 2024
1 parent 403816f commit 355aaac
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 27 deletions.
25 changes: 25 additions & 0 deletions src/crawlee/_utils/measure_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

import time
from contextlib import contextmanager
from typing import Iterator


class TimerResult:
wall: float | None
cpu: float | None


@contextmanager
def measure_time() -> Iterator[TimerResult]:
result = TimerResult()
before_wall = time.monotonic()
before_cpu = time.thread_time()

try:
yield result
finally:
after_wall = time.monotonic()
after_cpu = time.thread_time()
result.wall = after_wall - before_wall
result.cpu = after_cpu - before_cpu
59 changes: 32 additions & 27 deletions src/crawlee/autoscaling/autoscaled_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,39 @@ async def run(self: AutoscaledPool) -> None:
self._autoscale()
self._autoscale_task.start()

while not self._is_finished_function():
wait_for_workers_update = asyncio.create_task(
self._worker_tasks_updated.wait(), name='wait for worker tasks update'
)
wait_for_worker_tasks = asyncio.create_task(
asyncio.wait(self._worker_tasks, return_when=asyncio.FIRST_EXCEPTION),
name='wait for worker tasks to complete',
)

self._worker_tasks_updated.clear()

try:
await asyncio.wait(
[wait_for_workers_update, wait_for_worker_tasks],
return_when=asyncio.FIRST_COMPLETED,
try:
while not self._is_finished_function():
wait_for_workers_update = asyncio.create_task(
self._worker_tasks_updated.wait(), name='wait for worker tasks update'
)
wait_for_worker_tasks = asyncio.create_task(
asyncio.wait(self._worker_tasks, return_when=asyncio.FIRST_EXCEPTION),
name='wait for worker tasks to complete',
)
finally:
if not wait_for_worker_tasks.done():
wait_for_worker_tasks.cancel()

if not wait_for_workers_update.done():
wait_for_workers_update.cancel()

for task in self._worker_tasks:
if task.done():
exception = task.exception()
if exception is not None:
raise exception

self._worker_tasks_updated.clear()

try:
await asyncio.wait(
[wait_for_workers_update, wait_for_worker_tasks],
return_when=asyncio.FIRST_COMPLETED,
)
finally:
if not wait_for_worker_tasks.done():
wait_for_worker_tasks.cancel()

if not wait_for_workers_update.done():
wait_for_workers_update.cancel()

for task in self._worker_tasks:
if task.done():
exception = task.exception()
if exception is not None:
raise exception
finally:
await self._autoscale_task.stop()
self._desired_concurrency = 0
self._autoscale()

async def abort(self: AutoscaledPool) -> None:
"""Interrupt the autoscaled pool and all the tasks in progress."""
Expand Down
69 changes: 69 additions & 0 deletions tests/unit/autoscaling/test_autoscaled_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

import asyncio
from unittest.mock import MagicMock, Mock

import pytest

from crawlee._utils.measure_time import measure_time
from crawlee.autoscaling.autoscaled_pool import AutoscaledPool
from crawlee.autoscaling.system_status import SystemStatus


@pytest.fixture()
def system_status() -> SystemStatus | Mock:
return MagicMock(spec=SystemStatus)


@pytest.mark.asyncio()
async def test_runs_concurrently(system_status: SystemStatus | Mock) -> None:
done_count = 0

async def run() -> None:
await asyncio.sleep(0.1)
nonlocal done_count
done_count += 1

pool = AutoscaledPool(
system_status=system_status,
run_task_function=run,
is_task_ready_function=lambda: True,
is_finished_function=lambda: done_count >= 10,
min_concurrency=10,
max_concurrency=10,
)

with measure_time() as elapsed:
await pool.run()

assert elapsed.wall is not None
assert elapsed.wall < 0.3

assert done_count >= 10


@pytest.mark.asyncio()
async def test_propagates_exceptions(system_status: SystemStatus | Mock) -> None:
done_count = 0

async def run() -> None:
await asyncio.sleep(0.1)
nonlocal done_count
done_count += 1

if done_count > 5:
raise RuntimeError('Scheduled crash')

pool = AutoscaledPool(
system_status=system_status,
run_task_function=run,
is_task_ready_function=lambda: True,
is_finished_function=lambda: done_count >= 20,
min_concurrency=10,
max_concurrency=10,
)

with pytest.raises(RuntimeError):
await pool.run()

assert done_count < 20

0 comments on commit 355aaac

Please sign in to comment.