diff --git a/README.md b/README.md index eb0f058..34c65a7 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,6 @@ MVP: - better error handling in API (return 400 on user errors) - basic tests - build and publish runboat container image -- test what happens when the watcher looses connection to k8s - look at other TODO in code to see if anything important remains - basic UI (single page with a combo box to select repo and show builds by branch/pr, with start/stop buttons) @@ -56,6 +55,7 @@ MVP: More: - shiny UI +- websocket stream of build changes, for a dynamic UI - handle PR close (delete all builds for PR) - handle branch delete (delete all builds for branch) - create builds for all supported repos on startup (goes with sticky branches) diff --git a/log-config-dev.yaml b/log-config-dev.yaml index 842005c..6d73b13 100644 --- a/log-config-dev.yaml +++ b/log-config-dev.yaml @@ -3,13 +3,14 @@ disable_existing_loggers: false formatters: rich: datefmt: "[%X]" - format: "%(name)25s %(message)s" + # format: "%(name)25s %(message)s" handlers: console: class: rich.logging.RichHandler level: NOTSET formatter: rich rich_tracebacks: true + show_path: false root: level: DEBUG handlers: [console] diff --git a/src/runboat/controller.py b/src/runboat/controller.py index db11d34..6085f71 100644 --- a/src/runboat/controller.py +++ b/src/runboat/controller.py @@ -100,14 +100,15 @@ class Controller: build_name = deployment.metadata.labels.get("runboat/build") if not build_name: continue - _logger.debug(f"k8s {event_type} {deployment.kind} {build_name}") + should_wakeup = False if event_type in ("ADDED", "MODIFIED"): - self.db.add(Build.from_deployment(deployment)) + should_wakeup = self.db.add(Build.from_deployment(deployment)) elif event_type == "DELETED": - self.db.remove(build_name) + should_wakeup = self.db.remove(build_name) else: _logger.error(f"Unexpected k8s event type {event_type}.") - self._wakeup() + if should_wakeup: + self._wakeup() async def job_watcher(self) -> None: async for event_type, job in k8s.watch_jobs(): @@ -117,7 +118,6 @@ class Controller: job_kind = job.metadata.labels.get("runboat/job-kind") if job_kind not in ("initialize", "cleanup"): continue - _logger.debug(f"k8s {event_type} {job.kind} {job_kind} {build_name}") if event_type in ("ADDED", "MODIFIED"): build = self.db.get(build_name) if build is None: @@ -125,21 +125,27 @@ class Controller: # is starting and the db has not been populated yet. build = await Build.from_name(build_name) if build is None: + _logger.warning( + f"Received job event for {build_name} " + f"but the corresponding deployment is gone. " + f"Deleting all build resources." + ) + await k8s.delete_resources(build_name) continue if job_kind == "initialize": - if job.status.succeeded: + if job.status.active: + await build.on_initialize_started() + elif job.status.succeeded: await build.on_initialize_succeeded() elif job.status.failed: await build.on_initialize_failed() - else: - await build.on_initialize_started() if job_kind == "cleanup": - if job.status.succeeded: + if job.status.active: + await build.on_cleanup_started() + elif job.status.succeeded: await build.on_cleanup_succeeded() elif job.status.failed: await build.on_cleanup_failed() - else: - await build.on_cleanup_started() elif event_type == "DELETED": pass else: @@ -155,8 +161,8 @@ class Controller: if not to_initialize: continue # nothing startable, back to sleep _logger.info( - f"{self.initializing}/{self.max_initializing} builds are initializing. " - f"Initializing {len(to_initialize)} more." + f"{self.initializing} builds of max {self.max_initializing} " + f"are initializing. Initializing {len(to_initialize)} more." ) for build in to_initialize: await build.initialize() @@ -171,7 +177,7 @@ class Controller: if not to_stop: continue # nothing stoppable, back to sleep _logger.info( - f"{self.started}/{self.max_started} builds started. " + f"{self.started} builds of max {self.max_started} are started. " f"Stopping {len(to_stop)}." ) for build in to_stop: @@ -187,7 +193,7 @@ class Controller: if not to_undeploy: continue # nothing undeployable, back to sleep _logger.info( - f"{self.deployed}/{self.max_deployed} builds deployed. " + f"{self.deployed} builds of max {self.max_deployed} are deployed. " f"Undeploying {len(to_undeploy)}." ) for build in to_undeploy: diff --git a/src/runboat/db.py b/src/runboat/db.py index 63db7cb..0b5ed92 100644 --- a/src/runboat/db.py +++ b/src/runboat/db.py @@ -1,17 +1,17 @@ +import logging import sqlite3 from .models import BranchOrPull, Build, BuildInitStatus, BuildStatus +_logger = logging.getLogger(__name__) + class BuildsDb: """An in-memory database of builds. - It is maintained up-to-date by the controller that receives events from the cluster. - We use sqlite3 to facilitate queries and sorting, such as counting by status, or - finding oldest builds. - - Querying it on each event from k8s is probably not the most efficient, but this - should do for a start, and there are plenty of ways to optimize. + It is maintained up-to-date by the controller that receives events + from the cluster. We use sqlite3 to facilitate queries and sorting, + such as counting by status, or finding oldest builds. """ _con: sqlite3.Connection @@ -35,6 +35,7 @@ class BuildsDb: " pr INTEGER, " " git_commit TEXT NOT NULL, " " image TEXT NOT NULL," + " desired_replicas INTEGER NOT NULL," " status TEXT NOT NULL, " " init_status TEXT NOT NULL, " " last_scaled TEXT, " @@ -69,10 +70,17 @@ class BuildsDb: return self._build_from_row(row) def remove(self, name: str) -> None: + if self.get(name) is None: + return False # no change with self._con: self._con.execute("DELETE FROM builds WHERE name=?", (name,)) + _logger.info("Noticed removal of %s", name) + return True def add(self, build: Build) -> None: + prev_build = self.get(build.name) + if prev_build == build: + return False # no change with self._con: self._con.execute( "INSERT OR REPLACE INTO builds " @@ -84,12 +92,13 @@ class BuildsDb: " pr," " git_commit," " image," + " desired_replicas," " status," " init_status, " " last_scaled, " " created" ") " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( build.name, build.deployment_name, @@ -98,12 +107,27 @@ class BuildsDb: build.pr, build.git_commit, build.image, + build.desired_replicas, build.status, build.init_status, - build.last_scaled, + build.last_scaled.isoformat(), build.created.isoformat(), ), ) + if prev_build is None: + action = "addition" + else: + action = "update" + _logger.info( + "Noticed %s of %s (%s/%s/desired_replicas=%s/last_scaled=%s)", + action, + build, + build.status, + build.init_status, + build.desired_replicas, + build.last_scaled, + ) + return True def count_by_status(self, status: BuildStatus) -> int: return self._con.execute( diff --git a/src/runboat/github.py b/src/runboat/github.py index 717fc9c..8398206 100644 --- a/src/runboat/github.py +++ b/src/runboat/github.py @@ -15,7 +15,7 @@ def _github_get(url: str) -> Any: } if settings.github_token: headers["Authorization"] = f"token {settings.github_token}" - response = requests.get(full_url, headers) + response = requests.get(full_url, headers=headers) if response.status_code == 404: raise NotFoundOnGithub(f"GitHub URL not found: {full_url}.") response.raise_for_status() diff --git a/src/runboat/k8s.py b/src/runboat/k8s.py index 164cb1c..26d64f2 100644 --- a/src/runboat/k8s.py +++ b/src/runboat/k8s.py @@ -143,7 +143,9 @@ def _render_kubefiles(deployment_vars: DeploymentVars) -> Generator[Path, None, async def _kubectl(args: list[str]) -> None: - proc = await asyncio.create_subprocess_exec("kubectl", *args) + proc = await asyncio.create_subprocess_exec( + "kubectl", *args, stdout=subprocess.DEVNULL + ) return_code = await proc.wait() if return_code != 0: raise subprocess.CalledProcessError(return_code, ["kubectl"] + args) @@ -159,13 +161,7 @@ async def deploy(deployment_vars: DeploymentVars) -> None: str(tmp_path), ] ) - await _kubectl( - [ - "apply", - "-k", - str(tmp_path), - ] - ) + await _kubectl(["apply", "-k", str(tmp_path), "--wait=false"]) async def delete_resources(build_name: str) -> None: diff --git a/src/runboat/kubefiles/deployment.yaml b/src/runboat/kubefiles/deployment.yaml index 1540b3a..917b7be 100644 --- a/src/runboat/kubefiles/deployment.yaml +++ b/src/runboat/kubefiles/deployment.yaml @@ -3,9 +3,9 @@ kind: Deployment metadata: name: odoo annotations: - runboat/init-status: "todo" # ask controller to start when there is capacity + runboat/init-status: "todo" # ask controller to initialize when there is capacity spec: - replicas: 0 # deploy idle (with 0 replica) + replicas: 0 # deploy idle selector: matchLabels: app: odoo diff --git a/src/runboat/kubefiles/runboat-clone-and-install.sh b/src/runboat/kubefiles/runboat-clone-and-install.sh index bce65f9..3cd7ff3 100755 --- a/src/runboat/kubefiles/runboat-clone-and-install.sh +++ b/src/runboat/kubefiles/runboat-clone-and-install.sh @@ -4,7 +4,7 @@ set -ex # # Clone an addons repository at git reference in $ADDONS_DIR. -# Run oca_install_addons and oca_init_db on it. +# Run oca_install_addons on it. # git clone --filter=blob:none $RUNBOAT_GIT_REPO $ADDONS_DIR diff --git a/src/runboat/models.py b/src/runboat/models.py index 38f476c..466894a 100644 --- a/src/runboat/models.py +++ b/src/runboat/models.py @@ -24,7 +24,7 @@ class BuildStatus(str, Enum): class BuildInitStatus(str, Enum): - todo = "todo" # to initialize as soon as there is capacity + todo = "todo" # to initialize and start as soon as there is capacity started = "started" # initialization job running succeeded = "succeeded" # initialization job succeeded failed = "failed" # initialization job failed @@ -41,12 +41,26 @@ class Build(BaseModel): image: str status: BuildStatus init_status: BuildInitStatus + desired_replicas: int last_scaled: datetime.datetime created: datetime.datetime def __str__(self) -> str: return f"{self.slug} ({self.name})" + def __eq__(self, other: "Build") -> bool: + if other is None: + return False + if self.name != other.name: + return False + # Ignore fields that are immutable by design. + return ( + self.status == other.status + and self.init_status == other.init_status + and self.desired_replicas == other.desired_replicas + and self.last_scaled == other.last_scaled + ) + @classmethod async def from_name(cls, build_name: str) -> Optional["Build"]: """Create a Build model by reading the k8s api.""" @@ -67,8 +81,9 @@ class Build(BaseModel): image=deployment.spec.template.spec.containers[0].image, init_status=deployment.metadata.annotations["runboat/init-status"], status=cls._status_from_deployment(deployment), + desired_replicas=deployment.spec.replicas or 0, last_scaled=deployment.metadata.annotations.get("runboat/last-scaled") - or datetime.datetime.utcnow(), + or deployment.metadata.creation_timestamp, created=deployment.metadata.creation_timestamp, ) @@ -82,7 +97,7 @@ class Build(BaseModel): elif init_status == BuildInitStatus.failed: return BuildStatus.failed elif init_status == BuildInitStatus.succeeded: - replicas = deployment.status.replicas + replicas = deployment.spec.replicas if not replicas: return BuildStatus.stopped else: @@ -142,19 +157,17 @@ class Build(BaseModel): elif self.status == BuildStatus.failed: _logger.info(f"Marking failed {self} for reinitialization.") await k8s.delete_job(self.name, job_kind="initialize") - await self._patch( - init_status=BuildInitStatus.todo, replicas=0, update_last_scaled=False - ) + await self._patch(init_status=BuildInitStatus.todo, desired_replicas=0) elif self.status == BuildStatus.stopped: - _logger.info(f"Starting {self}.") - await self._patch(replicas=1, update_last_scaled=True) + _logger.info(f"Starting {self} that was last scaled on {self.last_scaled}.") + await self._patch(desired_replicas=1) async def stop(self) -> None: if self.status == BuildStatus.started: - _logger.info(f"Stopping {self}.") - await self._patch(replicas=0, update_last_scaled=True) + _logger.info(f"Stopping {self} that was last scaled on {self.last_scaled}.") + await self._patch(desired_replicas=0) else: - _logger.info("Ignoring stop command for {self} " "that is not started.") + _logger.info(f"Ignoring stop command for {self} that is not started.") async def initialize(self) -> None: # Start initizalization job. on_init_started/on_init_succeeded/on_init_failed @@ -193,9 +206,7 @@ class Build(BaseModel): if self.init_status == BuildInitStatus.started: return _logger.info(f"Initialization job started for {self}.") - await self._patch( - init_status=BuildInitStatus.started, replicas=0, update_last_scaled=True - ) + await self._patch(init_status=BuildInitStatus.started, desired_replicas=0) async def on_initialize_succeeded(self) -> None: if self.init_status == BuildInitStatus.succeeded: @@ -203,9 +214,7 @@ class Build(BaseModel): # succeeded old initialization jobs after a controller restart. return _logger.info(f"Initialization job succeded for {self}, starting.") - await self._patch( - init_status=BuildInitStatus.succeeded, replicas=1, update_last_scaled=True - ) + await self._patch(init_status=BuildInitStatus.succeeded, desired_replicas=1) async def on_initialize_failed(self) -> None: if self.init_status == BuildInitStatus.failed: @@ -213,15 +222,13 @@ class Build(BaseModel): # restarting, and is notified of existing initialization jobs. return _logger.info(f"Initialization job failed for {self}.") - await self._patch( - init_status=BuildInitStatus.failed, replicas=0, update_last_scaled=True - ) + await self._patch(init_status=BuildInitStatus.failed, desired_replicas=0) async def on_cleanup_started(self) -> None: + if self.init_status == BuildInitStatus.cleaning: + return _logger.info(f"Cleanup job started for {self}.") - await self._patch( - init_status=BuildInitStatus.cleaning, replicas=0, update_last_scaled=False - ) + await self._patch(init_status=BuildInitStatus.cleaning, desired_replicas=0) async def on_cleanup_succeeded(self) -> None: _logger.info(f"Cleanup job succeeded for {self}, deleting resources.") @@ -235,11 +242,10 @@ class Build(BaseModel): async def _patch( self, init_status: BuildInitStatus | None = None, - replicas: int | None = None, - update_last_scaled: bool = True, + desired_replicas: int | None = None, ) -> None: ops = [] - if init_status is not None: + if init_status is not None and init_status != self.init_status: ops.extend( [ { @@ -249,22 +255,24 @@ class Build(BaseModel): }, ], ) - if replicas is not None: - ops.append( - { - "op": "replace", - "path": "/spec/replicas", - "value": replicas, - }, - ) - if update_last_scaled: - ops.append( + if desired_replicas is not None and desired_replicas != self.desired_replicas: + ops.extend( + [ + { + "op": "replace", + "path": "/spec/replicas", + "value": desired_replicas, + }, { "op": "replace", "path": "/metadata/annotations/runboat~1last-scaled", - "value": datetime.datetime.utcnow().isoformat() + "Z", + "value": datetime.datetime.utcnow() + .replace(microsecond=0) + .isoformat() + + "Z", }, - ) + ] + ) await k8s.patch_deployment(self.deployment_name, ops)