Use observer pattern to listen on BuildsDb

Prepare for websocket notifications.
This commit is contained in:
Stéphane Bidoul 2021-11-14 19:23:09 +01:00
parent 020c6609b1
commit 03e6fa4795
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
3 changed files with 52 additions and 25 deletions

View file

@ -3,7 +3,7 @@ import logging
from typing import Any, Awaitable, Callable from typing import Any, Awaitable, Callable
from . import k8s from . import k8s
from .db import BuildsDb from .db import BuildEvent, BuildsDb
from .models import Build, BuildInitStatus, BuildStatus from .models import Build, BuildInitStatus, BuildStatus
from .settings import settings from .settings import settings
@ -40,10 +40,11 @@ class Controller:
self._wakeup_initializer = asyncio.Event() self._wakeup_initializer = asyncio.Event()
self._wakeup_stopper = asyncio.Event() self._wakeup_stopper = asyncio.Event()
self._wakeup_undeployer = asyncio.Event() self._wakeup_undeployer = asyncio.Event()
self.reset()
def reset(self) -> None:
self.db = BuildsDb() self.db = BuildsDb()
self.db.register_listener(self)
def build_updated(self, build: Build, event: BuildEvent) -> None:
self._wakeup()
@property @property
def started(self) -> int: def started(self) -> int:
@ -111,13 +112,12 @@ class Controller:
) )
build = await Build.from_name(build_name) build = await Build.from_name(build_name)
if build is not None: if build is not None:
if self.db.add(build): self.db.add(build)
self._wakeup()
return build return build
return None return None
async def deployment_watcher(self) -> None: async def deployment_watcher(self) -> None:
self.reset() # empty the local db each time we start watching self.db.reset() # empty the local db each time we start watching
async for event_type, deployment in k8s.watch_deployments(): async for event_type, deployment in k8s.watch_deployments():
_logger.debug( _logger.debug(
"Event %s %s %s dr=%s/rr=%s", "Event %s %s %s dr=%s/rr=%s",
@ -130,11 +130,10 @@ class Controller:
build_name = deployment.metadata.labels.get("runboat/build") build_name = deployment.metadata.labels.get("runboat/build")
if not build_name: if not build_name:
continue continue
should_wakeup = False
if event_type in (None, "ADDED", "MODIFIED"): if event_type in (None, "ADDED", "MODIFIED"):
prev_build = self.db.get(build_name) prev_build = self.db.get(build_name)
build = Build.from_deployment(deployment) build = Build.from_deployment(deployment)
should_wakeup = self.db.add(build) self.db.add(build)
if build.status == BuildStatus.undeploying and ( if build.status == BuildStatus.undeploying and (
prev_build is None or prev_build.status != BuildStatus.undeploying prev_build is None or prev_build.status != BuildStatus.undeploying
): ):
@ -143,9 +142,7 @@ class Controller:
) )
await build.cleanup() await build.cleanup()
elif event_type == "DELETED": elif event_type == "DELETED":
should_wakeup = self.db.remove(build_name) self.db.remove(build_name)
if should_wakeup:
self._wakeup()
async def job_watcher(self) -> None: async def job_watcher(self) -> None:
async for event_type, job in k8s.watch_jobs(): async for event_type, job in k8s.watch_jobs():

View file

@ -1,12 +1,24 @@
import logging import logging
import sqlite3 import sqlite3
from typing import cast from enum import Enum
from typing import Protocol, cast
from weakref import WeakSet
from .models import Build, BuildInitStatus, BuildStatus from .models import Build, BuildInitStatus, BuildStatus
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class BuildEvent(Enum):
modified = 0
removed = 1
class BuildListener(Protocol):
def build_updated(self, build: Build, event: BuildEvent) -> None:
...
class BuildsDb: class BuildsDb:
"""An in-memory database of builds. """An in-memory database of builds.
@ -18,8 +30,12 @@ class BuildsDb:
_con: sqlite3.Connection _con: sqlite3.Connection
def __init__(self) -> None: def __init__(self) -> None:
self._listeners: WeakSet[BuildListener] = WeakSet()
self.reset() self.reset()
def register_listener(self, listener: BuildListener) -> None:
self._listeners.add(listener)
@classmethod @classmethod
def _build_from_row(cls, row: sqlite3.Row) -> Build: def _build_from_row(cls, row: sqlite3.Row) -> Build:
return Build(**{k: row[k] for k in row.keys()}) return Build(**{k: row[k] for k in row.keys()})
@ -70,18 +86,20 @@ class BuildsDb:
return None return None
return self._build_from_row(row) return self._build_from_row(row)
def remove(self, name: str) -> bool: def remove(self, name: str) -> None:
if self.get(name) is None: build = self.get(name)
return False # no change if build is None:
return # already removed
with self._con: with self._con:
self._con.execute("DELETE FROM builds WHERE name=?", (name,)) self._con.execute("DELETE FROM builds WHERE name=?", (name,))
_logger.info("Noticed removal of %s", name) _logger.info("Noticed removal of %s", name)
return True for listener in self._listeners:
listener.build_updated(build, BuildEvent.removed)
def add(self, build: Build) -> bool: def add(self, build: Build) -> None:
prev_build = self.get(build.name) prev_build = self.get(build.name)
if prev_build == build: if prev_build == build:
return False # no change return # no change
with self._con: with self._con:
self._con.execute( self._con.execute(
"INSERT OR REPLACE INTO builds " "INSERT OR REPLACE INTO builds "
@ -128,7 +146,8 @@ class BuildsDb:
build.desired_replicas, build.desired_replicas,
build.last_scaled, build.last_scaled,
) )
return True for listener in self._listeners:
listener.build_updated(build, BuildEvent.modified)
def count_by_status(self, status: BuildStatus) -> int: def count_by_status(self, status: BuildStatus) -> int:
count = self._con.execute( count = self._con.execute(

View file

@ -1,4 +1,5 @@
import datetime import datetime
from unittest.mock import MagicMock
from runboat.db import BuildsDb from runboat.db import BuildsDb
from runboat.models import Build, BuildInitStatus, BuildStatus from runboat.models import Build, BuildInitStatus, BuildStatus
@ -31,17 +32,27 @@ def _make_build(
def test_add() -> None: def test_add() -> None:
db = BuildsDb() db = BuildsDb()
assert db.add(_make_build()) # new listener = MagicMock()
assert not db.add(_make_build()) # no change db.register_listener(listener)
assert db.add(_make_build(status=BuildStatus.failed)) db.add(_make_build()) # new
listener.build_updated.assert_called()
listener.reset_mock()
db.add(_make_build()) # no change
listener.build_updated.assert_not_called()
db.add(_make_build(status=BuildStatus.failed))
listener.build_updated.assert_called()
def test_remove() -> None: def test_remove() -> None:
db = BuildsDb() db = BuildsDb()
assert not db.remove("not-a-build") listener = MagicMock()
db.register_listener(listener)
db.remove("not-a-build")
listener.build_updated.assert_not_called()
build = _make_build() build = _make_build()
db.add(build) db.add(build)
assert db.remove(build.name) db.remove(build.name)
listener.build_updated.assert_called()
def test_get_for_commit() -> None: def test_get_for_commit() -> None: