From 03e6fa4795827c9491fad9f31f81a65ac52c5084 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sun, 14 Nov 2021 19:23:09 +0100 Subject: [PATCH] Use observer pattern to listen on BuildsDb Prepare for websocket notifications. --- src/runboat/controller.py | 21 +++++++++------------ src/runboat/db.py | 35 +++++++++++++++++++++++++++-------- tests/test_db.py | 21 ++++++++++++++++----- 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/src/runboat/controller.py b/src/runboat/controller.py index cc3b8a5..d0d5797 100644 --- a/src/runboat/controller.py +++ b/src/runboat/controller.py @@ -3,7 +3,7 @@ import logging from typing import Any, Awaitable, Callable from . import k8s -from .db import BuildsDb +from .db import BuildEvent, BuildsDb from .models import Build, BuildInitStatus, BuildStatus from .settings import settings @@ -40,10 +40,11 @@ class Controller: self._wakeup_initializer = asyncio.Event() self._wakeup_stopper = asyncio.Event() self._wakeup_undeployer = asyncio.Event() - self.reset() - - def reset(self) -> None: self.db = BuildsDb() + self.db.register_listener(self) + + def build_updated(self, build: Build, event: BuildEvent) -> None: + self._wakeup() @property def started(self) -> int: @@ -111,13 +112,12 @@ class Controller: ) build = await Build.from_name(build_name) if build is not None: - if self.db.add(build): - self._wakeup() + self.db.add(build) return build return None async def deployment_watcher(self) -> None: - self.reset() # empty the local db each time we start watching + self.db.reset() # empty the local db each time we start watching async for event_type, deployment in k8s.watch_deployments(): _logger.debug( "Event %s %s %s dr=%s/rr=%s", @@ -130,11 +130,10 @@ class Controller: build_name = deployment.metadata.labels.get("runboat/build") if not build_name: continue - should_wakeup = False if event_type in (None, "ADDED", "MODIFIED"): prev_build = self.db.get(build_name) build = Build.from_deployment(deployment) - should_wakeup = self.db.add(build) + self.db.add(build) if build.status == BuildStatus.undeploying and ( prev_build is None or prev_build.status != BuildStatus.undeploying ): @@ -143,9 +142,7 @@ class Controller: ) await build.cleanup() elif event_type == "DELETED": - should_wakeup = self.db.remove(build_name) - if should_wakeup: - self._wakeup() + self.db.remove(build_name) async def job_watcher(self) -> None: async for event_type, job in k8s.watch_jobs(): diff --git a/src/runboat/db.py b/src/runboat/db.py index 09092c7..69569ea 100644 --- a/src/runboat/db.py +++ b/src/runboat/db.py @@ -1,12 +1,24 @@ import logging import sqlite3 -from typing import cast +from enum import Enum +from typing import Protocol, cast +from weakref import WeakSet from .models import Build, BuildInitStatus, BuildStatus _logger = logging.getLogger(__name__) +class BuildEvent(Enum): + modified = 0 + removed = 1 + + +class BuildListener(Protocol): + def build_updated(self, build: Build, event: BuildEvent) -> None: + ... + + class BuildsDb: """An in-memory database of builds. @@ -18,8 +30,12 @@ class BuildsDb: _con: sqlite3.Connection def __init__(self) -> None: + self._listeners: WeakSet[BuildListener] = WeakSet() self.reset() + def register_listener(self, listener: BuildListener) -> None: + self._listeners.add(listener) + @classmethod def _build_from_row(cls, row: sqlite3.Row) -> Build: return Build(**{k: row[k] for k in row.keys()}) @@ -70,18 +86,20 @@ class BuildsDb: return None return self._build_from_row(row) - def remove(self, name: str) -> bool: - if self.get(name) is None: - return False # no change + def remove(self, name: str) -> None: + build = self.get(name) + if build is None: + return # already removed with self._con: self._con.execute("DELETE FROM builds WHERE name=?", (name,)) _logger.info("Noticed removal of %s", name) - return True + for listener in self._listeners: + listener.build_updated(build, BuildEvent.removed) - def add(self, build: Build) -> bool: + def add(self, build: Build) -> None: prev_build = self.get(build.name) if prev_build == build: - return False # no change + return # no change with self._con: self._con.execute( "INSERT OR REPLACE INTO builds " @@ -128,7 +146,8 @@ class BuildsDb: build.desired_replicas, build.last_scaled, ) - return True + for listener in self._listeners: + listener.build_updated(build, BuildEvent.modified) def count_by_status(self, status: BuildStatus) -> int: count = self._con.execute( diff --git a/tests/test_db.py b/tests/test_db.py index 4d3cba7..c6b2f83 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,4 +1,5 @@ import datetime +from unittest.mock import MagicMock from runboat.db import BuildsDb from runboat.models import Build, BuildInitStatus, BuildStatus @@ -31,17 +32,27 @@ def _make_build( def test_add() -> None: db = BuildsDb() - assert db.add(_make_build()) # new - assert not db.add(_make_build()) # no change - assert db.add(_make_build(status=BuildStatus.failed)) + listener = MagicMock() + db.register_listener(listener) + db.add(_make_build()) # new + listener.build_updated.assert_called() + listener.reset_mock() + db.add(_make_build()) # no change + listener.build_updated.assert_not_called() + db.add(_make_build(status=BuildStatus.failed)) + listener.build_updated.assert_called() def test_remove() -> None: db = BuildsDb() - assert not db.remove("not-a-build") + listener = MagicMock() + db.register_listener(listener) + db.remove("not-a-build") + listener.build_updated.assert_not_called() build = _make_build() db.add(build) - assert db.remove(build.name) + db.remove(build.name) + listener.build_updated.assert_called() def test_get_for_commit() -> None: