diff --git a/src/fontra_rcjk/client_async.py b/src/fontra_rcjk/client_async.py index f0d09df..998155c 100644 --- a/src/fontra_rcjk/client_async.py +++ b/src/fontra_rcjk/client_async.py @@ -1,10 +1,46 @@ import asyncio +import logging +from collections import defaultdict +from contextlib import asynccontextmanager import aiohttp from .client import Client as RCJKClient from .client import HTTPError +logger = logging.getLogger(__name__) + +MAX_CONCURRENT_CALLS = 140 # MySQL's default max connections is 151 + + +class ConcurrentCallLimiter: + def __init__(self): + self.num_calls_in_progress = 0 + self.event_queue = [] + + @asynccontextmanager + async def limit(self): + if self.num_calls_in_progress >= MAX_CONCURRENT_CALLS: + if not self.event_queue: + logger.info("limiting concurrent API calls") + event = asyncio.Event() + self.event_queue.append(event) + await event.wait() + + self.num_calls_in_progress += 1 + try: + yield + finally: + self.num_calls_in_progress -= 1 + if self.num_calls_in_progress < MAX_CONCURRENT_CALLS and self.event_queue: + event = self.event_queue.pop(0) + event.set() + if not self.event_queue: + logger.info("done limiting concurrent API calls") + + +call_limiters = defaultdict(ConcurrentCallLimiter) + class RCJKClientAsync(RCJKClient): def _connect(self): @@ -13,6 +49,7 @@ def _connect(self): pass async def connect(self): + self._call_limiter = call_limiters[self._host] self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) session = await self._session.__aenter__() assert session is self._session @@ -52,6 +89,11 @@ async def get_project_font_uid_mapping(self): return project_font_uid_mapping async def _api_call(self, view_name, params=None): + async with self._call_limiter.limit(): + result = await self._api_call_unlimited(view_name, params) + return result + + async def _api_call_unlimited(self, view_name, params=None): url, data, headers = self._prepare_request(view_name, params) async with self._session.post(url, data=data, headers=headers) as response: if response.status == 401: