Move cleanup tasks out of the watcher loop
This commit is contained in:
parent
41a4b278b8
commit
92060b2026
2 changed files with 23 additions and 14 deletions
|
|
@ -26,6 +26,8 @@ class Controller:
|
||||||
- The 'initializer' starts initialization jobs for deployment that have been marked
|
- The 'initializer' starts initialization jobs for deployment that have been marked
|
||||||
with 'runboat/init-status=todo', while making sure that the maximum number of
|
with 'runboat/init-status=todo', while making sure that the maximum number of
|
||||||
deployments initializing concurrently does not exceed the limit.
|
deployments initializing concurrently does not exceed the limit.
|
||||||
|
- The 'cleaner' starts cleanup jobs for deployment that have been marked for
|
||||||
|
deletion.
|
||||||
- The 'stopper' stops old running deployments.
|
- The 'stopper' stops old running deployments.
|
||||||
- The 'undeployer' undeploys old stopped deployments.
|
- The 'undeployer' undeploys old stopped deployments.
|
||||||
"""
|
"""
|
||||||
|
|
@ -35,11 +37,16 @@ class Controller:
|
||||||
self._wakeup_initializer = asyncio.Event()
|
self._wakeup_initializer = asyncio.Event()
|
||||||
self._wakeup_stopper = asyncio.Event()
|
self._wakeup_stopper = asyncio.Event()
|
||||||
self._wakeup_undeployer = asyncio.Event()
|
self._wakeup_undeployer = asyncio.Event()
|
||||||
|
self._wakeup_cleaner = asyncio.Event()
|
||||||
self.db = BuildsDb()
|
self.db = BuildsDb()
|
||||||
self.db.register_listener(self)
|
self.db.register_listener(self)
|
||||||
|
|
||||||
def on_build_event(self, event: BuildEvent, build: Build) -> None:
|
def on_build_event(self, event: BuildEvent, build: Build) -> None:
|
||||||
self._wakeup()
|
self._wakeup_initializer.set()
|
||||||
|
self._wakeup_stopper.set()
|
||||||
|
self._wakeup_undeployer.set()
|
||||||
|
if event == BuildEvent.modified and build.status == BuildStatus.undeploying:
|
||||||
|
self._wakeup_cleaner.set()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def started(self) -> int:
|
def started(self) -> int:
|
||||||
|
|
@ -85,11 +92,6 @@ class Controller:
|
||||||
return
|
return
|
||||||
await Build.deploy(commit_info)
|
await Build.deploy(commit_info)
|
||||||
|
|
||||||
def _wakeup(self) -> None:
|
|
||||||
self._wakeup_initializer.set()
|
|
||||||
self._wakeup_stopper.set()
|
|
||||||
self._wakeup_undeployer.set()
|
|
||||||
|
|
||||||
async def get_build(self, build_name: str, db_only: bool = True) -> Build | None:
|
async def get_build(self, build_name: str, db_only: bool = True) -> Build | None:
|
||||||
build = self.db.get(build_name)
|
build = self.db.get(build_name)
|
||||||
if build is not None:
|
if build is not None:
|
||||||
|
|
@ -119,16 +121,8 @@ class Controller:
|
||||||
if not build_name:
|
if not build_name:
|
||||||
continue
|
continue
|
||||||
if event_type in (None, "ADDED", "MODIFIED"):
|
if event_type in (None, "ADDED", "MODIFIED"):
|
||||||
prev_build = self.db.get(build_name)
|
|
||||||
build = Build.from_deployment(deployment)
|
build = Build.from_deployment(deployment)
|
||||||
self.db.add(build)
|
self.db.add(build)
|
||||||
if build.status == BuildStatus.undeploying and (
|
|
||||||
prev_build is None or prev_build.status != BuildStatus.undeploying
|
|
||||||
):
|
|
||||||
_logger.info(
|
|
||||||
f"{build} has deletionTimestamp. Launching cleanup job."
|
|
||||||
)
|
|
||||||
await build.cleanup()
|
|
||||||
elif event_type == "DELETED":
|
elif event_type == "DELETED":
|
||||||
self.db.remove(build_name)
|
self.db.remove(build_name)
|
||||||
|
|
||||||
|
|
@ -180,6 +174,13 @@ class Controller:
|
||||||
elif event_type == "DELETED":
|
elif event_type == "DELETED":
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def cleaner(self) -> None:
|
||||||
|
while True:
|
||||||
|
await self._wakeup_cleaner.wait()
|
||||||
|
self._wakeup_cleaner.clear()
|
||||||
|
for build in self.db.to_cleanup():
|
||||||
|
await build.cleanup()
|
||||||
|
|
||||||
async def initializer(self) -> None:
|
async def initializer(self) -> None:
|
||||||
while True:
|
while True:
|
||||||
await self._wakeup_initializer.wait()
|
await self._wakeup_initializer.wait()
|
||||||
|
|
@ -250,6 +251,7 @@ class Controller:
|
||||||
for f in (
|
for f in (
|
||||||
self.deployment_watcher,
|
self.deployment_watcher,
|
||||||
self.job_watcher,
|
self.job_watcher,
|
||||||
|
self.cleaner,
|
||||||
self.initializer,
|
self.initializer,
|
||||||
self.stopper,
|
self.stopper,
|
||||||
self.undeployer,
|
self.undeployer,
|
||||||
|
|
|
||||||
|
|
@ -170,6 +170,13 @@ class BuildsDb:
|
||||||
count = self._con.execute("SELECT COUNT(name) FROM builds").fetchone()[0]
|
count = self._con.execute("SELECT COUNT(name) FROM builds").fetchone()[0]
|
||||||
return cast(int, count)
|
return cast(int, count)
|
||||||
|
|
||||||
|
def to_cleanup(self) -> list[Build]:
|
||||||
|
rows = self._con.execute(
|
||||||
|
"SELECT * FROM builds WHERE status=? ORDER BY created",
|
||||||
|
(BuildStatus.undeploying,),
|
||||||
|
).fetchall()
|
||||||
|
return [self._build_from_row(row) for row in rows]
|
||||||
|
|
||||||
def to_initialize(self, limit: int) -> list[Build]:
|
def to_initialize(self, limit: int) -> list[Build]:
|
||||||
"""Return the list of builds to initialize, ordered by creation timestamp."""
|
"""Return the list of builds to initialize, ordered by creation timestamp."""
|
||||||
rows = self._con.execute(
|
rows = self._con.execute(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue