Better pattern for worker wakeup

This commit is contained in:
Stéphane Bidoul 2021-11-05 18:45:24 +01:00
parent 903a05fc0e
commit 8b10c209bd
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92

View file

@ -1,5 +1,4 @@
import asyncio import asyncio
import contextlib
import logging import logging
from . import k8s from . import k8s
@ -31,11 +30,15 @@ class Controller:
db: BuildsDb db: BuildsDb
_tasks: list[asyncio.Task] _tasks: list[asyncio.Task]
_wakeup_event: asyncio.Event _wakeup_initializer: asyncio.Event
_wakeup_stopper: asyncio.Event
_wakeup_undeployer: asyncio.Event
def __init__(self) -> None: def __init__(self) -> None:
self._tasks = [] self._tasks = []
self._wakeup_event = asyncio.Event() self._wakeup_initializer = asyncio.Event()
self._wakeup_stopper = asyncio.Event()
self._wakeup_undeployer = asyncio.Event()
self.reset() self.reset()
def reset(self) -> None: def reset(self) -> None:
@ -93,14 +96,9 @@ class Controller:
) )
def _wakeup(self) -> None: def _wakeup(self) -> None:
self._wakeup_event.set() self._wakeup_initializer.set()
self._wakeup_event.clear() self._wakeup_stopper.set()
self._wakeup_undeployer.set()
async def _sleep(self) -> None:
# Wait on the wakeup event, but wakeup after 10 seconds if nothing happens,
# in case we have missed an event.
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._wakeup_event.wait(), 10)
async def get_build(self, build_name: str, db_only: bool = True) -> Build | None: async def get_build(self, build_name: str, db_only: bool = True) -> Build | None:
build = self.db.get(build_name) build = self.db.get(build_name)
@ -201,7 +199,8 @@ class Controller:
async def initializer(self) -> None: async def initializer(self) -> None:
while True: while True:
await self._sleep() await self._wakeup_initializer.wait()
self._wakeup_initializer.clear()
can_initialize = self.max_initializing - self.initializing can_initialize = self.max_initializing - self.initializing
if can_initialize <= 0: if can_initialize <= 0:
continue # no capacity for now, back to sleep continue # no capacity for now, back to sleep
@ -217,7 +216,8 @@ class Controller:
async def stopper(self) -> None: async def stopper(self) -> None:
while True: while True:
await self._sleep() await self._wakeup_stopper.wait()
self._wakeup_stopper.clear()
can_stop = self.started - self.max_started can_stop = self.started - self.max_started
if can_stop <= 0: if can_stop <= 0:
continue # no need to stop for now, back to sleep continue # no need to stop for now, back to sleep
@ -233,7 +233,8 @@ class Controller:
async def undeployer(self) -> None: async def undeployer(self) -> None:
while True: while True:
await self._sleep() await self._wakeup_undeployer.wait()
self._wakeup_undeployer.clear()
can_undeploy = self.deployed - self.max_deployed can_undeploy = self.deployed - self.max_deployed
if can_undeploy <= 0: if can_undeploy <= 0:
continue # no need to undeploy for now, back to sleep continue # no need to undeploy for now, back to sleep