From 8b10c209bdaa75f89245dd3028361520a834b042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 5 Nov 2021 18:45:24 +0100 Subject: [PATCH] Better pattern for worker wakeup --- src/runboat/controller.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/runboat/controller.py b/src/runboat/controller.py index 6626f54..13973ac 100644 --- a/src/runboat/controller.py +++ b/src/runboat/controller.py @@ -1,5 +1,4 @@ import asyncio -import contextlib import logging from . import k8s @@ -31,11 +30,15 @@ class Controller: db: BuildsDb _tasks: list[asyncio.Task] - _wakeup_event: asyncio.Event + _wakeup_initializer: asyncio.Event + _wakeup_stopper: asyncio.Event + _wakeup_undeployer: asyncio.Event def __init__(self) -> None: self._tasks = [] - self._wakeup_event = asyncio.Event() + self._wakeup_initializer = asyncio.Event() + self._wakeup_stopper = asyncio.Event() + self._wakeup_undeployer = asyncio.Event() self.reset() def reset(self) -> None: @@ -93,14 +96,9 @@ class Controller: ) def _wakeup(self) -> None: - self._wakeup_event.set() - self._wakeup_event.clear() - - async def _sleep(self) -> None: - # Wait on the wakeup event, but wakeup after 10 seconds if nothing happens, - # in case we have missed an event. - with contextlib.suppress(asyncio.TimeoutError): - await asyncio.wait_for(self._wakeup_event.wait(), 10) + self._wakeup_initializer.set() + self._wakeup_stopper.set() + self._wakeup_undeployer.set() async def get_build(self, build_name: str, db_only: bool = True) -> Build | None: build = self.db.get(build_name) @@ -201,7 +199,8 @@ class Controller: async def initializer(self) -> None: while True: - await self._sleep() + await self._wakeup_initializer.wait() + self._wakeup_initializer.clear() can_initialize = self.max_initializing - self.initializing if can_initialize <= 0: continue # no capacity for now, back to sleep @@ -217,7 +216,8 @@ class Controller: async def stopper(self) -> None: while True: - await self._sleep() + await self._wakeup_stopper.wait() + self._wakeup_stopper.clear() can_stop = self.started - self.max_started if can_stop <= 0: continue # no need to stop for now, back to sleep @@ -233,7 +233,8 @@ class Controller: async def undeployer(self) -> None: while True: - await self._sleep() + await self._wakeup_undeployer.wait() + self._wakeup_undeployer.clear() can_undeploy = self.deployed - self.max_deployed if can_undeploy <= 0: continue # no need to undeploy for now, back to sleep