Skip to content

Commit

Permalink
Merge pull request #124 from googlefonts/limit-concurrent-api-calls
Browse files Browse the repository at this point in the history
Limit the global number of concurrent API calls to avoid overloading the MySQL server
  • Loading branch information
justvanrossum authored Nov 9, 2023
2 parents dc73649 + 9e47ffa commit 4e90546
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/fontra_rcjk/client_async.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,46 @@
import asyncio
import logging
from collections import defaultdict
from contextlib import asynccontextmanager

import aiohttp

from .client import Client as RCJKClient
from .client import HTTPError

logger = logging.getLogger(__name__)

MAX_CONCURRENT_CALLS = 140 # MySQL's default max connections is 151


class ConcurrentCallLimiter:
def __init__(self):
self.num_calls_in_progress = 0
self.event_queue = []

@asynccontextmanager
async def limit(self):
if self.num_calls_in_progress >= MAX_CONCURRENT_CALLS:
if not self.event_queue:
logger.info("limiting concurrent API calls")
event = asyncio.Event()
self.event_queue.append(event)
await event.wait()

self.num_calls_in_progress += 1
try:
yield
finally:
self.num_calls_in_progress -= 1
if self.num_calls_in_progress < MAX_CONCURRENT_CALLS and self.event_queue:
event = self.event_queue.pop(0)
event.set()
if not self.event_queue:
logger.info("done limiting concurrent API calls")


call_limiters = defaultdict(ConcurrentCallLimiter)


class RCJKClientAsync(RCJKClient):
def _connect(self):
Expand All @@ -13,6 +49,7 @@ def _connect(self):
pass

async def connect(self):
self._call_limiter = call_limiters[self._host]
self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False))
session = await self._session.__aenter__()
assert session is self._session
Expand Down Expand Up @@ -52,6 +89,11 @@ async def get_project_font_uid_mapping(self):
return project_font_uid_mapping

async def _api_call(self, view_name, params=None):
async with self._call_limiter.limit():
result = await self._api_call_unlimited(view_name, params)
return result

async def _api_call_unlimited(self, view_name, params=None):
url, data, headers = self._prepare_request(view_name, params)
async with self._session.post(url, data=data, headers=headers) as response:
if response.status == 401:
Expand Down

0 comments on commit 4e90546

Please sign in to comment.