diff --git a/README.md b/README.md index 1821c10..66197c7 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,8 @@ For running the controller: - Python 3.10 - `kubectl` - A `KUBECONFIG` that provides access to the namespace where the builds are deployed, - with permissions to create and delete service, deployment, ingress, secret and - configmap resources. + with permissions to create and delete Service, Job, Deployment, Ingress, Secret and + ConfigMap resources. ## Developing @@ -35,8 +35,9 @@ Contributions welcome. Prototype (min required to do load testing): +- github token for github api requests +- set requests otherwise requests is same as limits ? - plug it on a bunch of OCA and shopinvader repos to test load -- handle init failures, add failed status - configuring many repos in a .env file may be difficult, switch to a toml file ? MVP: @@ -62,3 +63,57 @@ More: - create builds for all supported repos on startup (goes with sticky branches) - never undeploy last build of sticky branches - make build images configurable (see `build_images.py`) + + +## Kubefiles + +Kustomize template with 3 modes (deploy, initialize, cleanup). + +## Synchronous actions on builds (fast) + +- deploy + + - create deployment with 0 replicas and runboat/init-status="todo" + +- start: + + - if runboat/init-status=="ready", scale to 1 + - elif runboat/init-status in ("todo", "initializing"), do nothing + - elif runboat/init-status=="failed", set runboat/init-status="todo" + +- stop: + + - scale deployment to 0 + +- undeploy: + + - scale deployment to 0 + - set runboat/init-status to "dropping" + - start dropdb job (restart=Never, backoffLimit=6) + +## Workers + +- initializer (works on deployments with runboat/init-status="todo", ordered by + runboat/init-status-timestamp), obeying max_initializing: + + - set runboat/init-status to "initializing" + - (re)create init job which will drop and init db (restart=Never, backoffLimit=0) + +- job-watcher: + + - on successful termination of initdb job: set runboat/init-status to "ready", scale + deployment to 1 + - on failure of initdb job: set runboat/init-status to "failed" + - on success of dropdb job: delete all resources + +- deployment-watcher: + + - maintains an in-memory db of deployments + +- stopper: + + - stop old started, to reach max_running + +- undeployer: + + - undeploy old stopped, to reach max_deployed diff --git a/log-config-dev.yaml b/log-config-dev.yaml index 8075887..842005c 100644 --- a/log-config-dev.yaml +++ b/log-config-dev.yaml @@ -13,3 +13,6 @@ handlers: root: level: DEBUG handlers: [console] +loggers: + kubernetes_asyncio.client.rest: + level: INFO diff --git a/src/runboat/api.py b/src/runboat/api.py index 7e8fa25..2374bac 100644 --- a/src/runboat/api.py +++ b/src/runboat/api.py @@ -16,10 +16,10 @@ router = APIRouter() class Status(BaseModel): deployed: int max_deployed: int - running: int - max_running: int - starting: int - max_starting: int + started: int + max_started: int + initializing: int + max_initializing: int class Config: orm_mode = True @@ -153,14 +153,14 @@ async def log(name: str): async def start(name: str): """Start the deployment.""" build = _build_by_name(name) - await build.delay_start() + await build.start() @router.post("/builds/{name}/stop") async def stop(name: str): """Stop the deployment.""" build = _build_by_name(name) - await build.scale(0) + await build.stop() @router.delete("/builds/{name}", dependencies=[Depends(authenticated)]) diff --git a/src/runboat/controller.py b/src/runboat/controller.py index 9112493..db11d34 100644 --- a/src/runboat/controller.py +++ b/src/runboat/controller.py @@ -1,9 +1,10 @@ import asyncio +import contextlib import logging from . import k8s from .db import BuildsDb -from .models import Build, BuildStatus +from .models import Build, BuildInitStatus, BuildStatus from .settings import settings _logger = logging.getLogger(__name__) @@ -14,12 +15,16 @@ class Controller: It runs several background tasks: - - The 'watcher' listens to kubernetes events on deployements and maintains an - in-memory database of existing deployments and their state. It wakes up the - starter and the reaper when necessary. - - The 'starter' starts deployments that have been flagged to start, while making - sure that the maximum number of deployment starting concurrently does not exceed - the limit. + - The 'deployment_watcher' listens to kubernetes events on deployments and maintains + an in-memory database of existing deployments and their state. It wakes up the + initializer, stopper and undeployer when the state of deployments change. + - The 'job_watcher' listens to kubernetes events on jobs, to maintain the + runboat/init-status annotation on deployments, and act on such events (such as + starting when an initialization succeeded or undeploying when a cleanup + succeeded). + - The 'initializer' starts initialization jobs for deployment that have been marked + with 'runboat/init-status=todo', while making sure that the maximum number of + deployments initializing concurrently does not exceed the limit. - The 'stopper' stops old running deployments. - The 'undeployer' undeploys old stopped deployments. """ @@ -37,20 +42,20 @@ class Controller: self.db = BuildsDb() @property - def running(self) -> int: - return self.db.count_by_statuses((BuildStatus.started, BuildStatus.starting)) + def started(self) -> int: + return self.db.count_by_status(BuildStatus.started) @property - def max_running(self) -> int: - return settings.max_running + def max_started(self) -> int: + return settings.max_started @property - def starting(self) -> int: - return self.db.count_by_statuses((BuildStatus.starting,)) + def initializing(self) -> int: + return self.db.count_by_init_status(BuildInitStatus.started) @property - def max_starting(self) -> int: - return settings.max_starting + def max_initializing(self) -> int: + return settings.max_initializing @property def deployed(self) -> int: @@ -70,7 +75,7 @@ class Controller: git_commit=git_commit, ) if build is not None: - await build.delay_start() + await build.start() return await Build.deploy( repo=repo, @@ -83,13 +88,19 @@ class Controller: self._wakeup_event.set() self._wakeup_event.clear() - async def watcher(self) -> None: + async def _sleep(self) -> None: + # Wait on the wakeup event, but wakeup after 10 seconds if nothing happens, + # in case we have missed an event. + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(self._wakeup_event.wait(), 10) + + async def deployment_watcher(self) -> None: self.reset() # empty the local db each time we start watching async for event_type, deployment in k8s.watch_deployments(): build_name = deployment.metadata.labels.get("runboat/build") if not build_name: continue - _logger.debug(f"{event_type} deployment {build_name}") + _logger.debug(f"k8s {event_type} {deployment.kind} {build_name}") if event_type in ("ADDED", "MODIFIED"): self.db.add(Build.from_deployment(deployment)) elif event_type == "DELETED": @@ -98,58 +109,89 @@ class Controller: _logger.error(f"Unexpected k8s event type {event_type}.") self._wakeup() - async def starter(self) -> None: + async def job_watcher(self) -> None: + async for event_type, job in k8s.watch_jobs(): + build_name = job.metadata.labels.get("runboat/build") + if not build_name: + continue + job_kind = job.metadata.labels.get("runboat/job-kind") + if job_kind not in ("initialize", "cleanup"): + continue + _logger.debug(f"k8s {event_type} {job.kind} {job_kind} {build_name}") + if event_type in ("ADDED", "MODIFIED"): + build = self.db.get(build_name) + if build is None: + # Not found in db, look in k8s api, in case the controller + # is starting and the db has not been populated yet. + build = await Build.from_name(build_name) + if build is None: + continue + if job_kind == "initialize": + if job.status.succeeded: + await build.on_initialize_succeeded() + elif job.status.failed: + await build.on_initialize_failed() + else: + await build.on_initialize_started() + if job_kind == "cleanup": + if job.status.succeeded: + await build.on_cleanup_succeeded() + elif job.status.failed: + await build.on_cleanup_failed() + else: + await build.on_cleanup_started() + elif event_type == "DELETED": + pass + else: + _logger.error(f"Unexpected k8s event type {event_type}.") + + async def initializer(self) -> None: while True: - await self._wakeup_event.wait() - while True: - can_start = min( - self.max_running - self.running + 1, - self.max_starting - self.starting, - ) - if can_start <= 0: - break # no capacity for now, back to sleep - to_start = self.db.to_start(limit=can_start) - if not to_start: - break # nothing startable, back to sleep - _logger.info(f"Starting {len(to_start)} builds of up to {can_start}.") - for build in to_start: - await build.scale(1) - if len(to_start) < can_start: - break # back to sleep + await self._sleep() + can_initialize = self.max_initializing - self.initializing + if can_initialize <= 0: + continue # no capacity for now, back to sleep + to_initialize = self.db.to_initialize(limit=can_initialize) + if not to_initialize: + continue # nothing startable, back to sleep + _logger.info( + f"{self.initializing}/{self.max_initializing} builds are initializing. " + f"Initializing {len(to_initialize)} more." + ) + for build in to_initialize: + await build.initialize() async def stopper(self) -> None: while True: - await self._wakeup_event.wait() - while True: - can_stop = self.running - self.max_running - if can_stop <= 0: - break # no need to stop for now, back to sleep - to_stop = self.db.oldest_started(limit=can_stop) - if not to_stop: - break # nothing stoppable, back to sleep - _logger.info(f"Stopping {len(to_stop)} builds of up to {can_stop}.") - for build in to_stop: - await build.scale(0) - if len(to_stop) < can_stop: - break # back to sleep + await self._sleep() + can_stop = self.started - self.max_started + if can_stop <= 0: + continue # no need to stop for now, back to sleep + to_stop = self.db.oldest_started(limit=can_stop) + if not to_stop: + continue # nothing stoppable, back to sleep + _logger.info( + f"{self.started}/{self.max_started} builds started. " + f"Stopping {len(to_stop)}." + ) + for build in to_stop: + await build.stop() async def undeployer(self) -> None: while True: - await self._wakeup_event.wait() - while True: - can_undeploy = self.deployed - self.max_deployed - if can_undeploy <= 0: - break # no need to undeploy for now, back to sleep - to_undeploy = self.db.oldest_stopped(limit=can_undeploy) - if not to_undeploy: - break # nothing undeployable, back to sleep - _logger.info( - f"Undeploying {len(to_undeploy)} builds of up to {can_undeploy}." - ) - for build in to_undeploy: - await build.undeploy() - if len(to_undeploy) < can_undeploy: - break # back to sleep + await self._sleep() + can_undeploy = self.deployed - self.max_deployed + if can_undeploy <= 0: + continue # no need to undeploy for now, back to sleep + to_undeploy = self.db.oldest_stopped(limit=can_undeploy) + if not to_undeploy: + continue # nothing undeployable, back to sleep + _logger.info( + f"{self.deployed}/{self.max_deployed} builds deployed. " + f"Undeploying {len(to_undeploy)}." + ) + for build in to_undeploy: + await build.undeploy() async def start(self) -> None: _logger.info("Starting controller tasks.") @@ -167,7 +209,13 @@ class Controller: ) await asyncio.sleep(delay) - for f in (self.watcher, self.starter, self.stopper, self.undeployer): + for f in ( + self.deployment_watcher, + self.job_watcher, + self.initializer, + self.stopper, + self.undeployer, + ): self._tasks.append(asyncio.create_task(walking_dead(f))) async def stop(self) -> None: diff --git a/src/runboat/db.py b/src/runboat/db.py index 36c690f..63db7cb 100644 --- a/src/runboat/db.py +++ b/src/runboat/db.py @@ -1,6 +1,6 @@ import sqlite3 -from .models import BranchOrPull, Build, BuildStatus, BuildTodo +from .models import BranchOrPull, Build, BuildInitStatus, BuildStatus class BuildsDb: @@ -36,12 +36,14 @@ class BuildsDb: " git_commit TEXT NOT NULL, " " image TEXT NOT NULL," " status TEXT NOT NULL, " - " todo TEXT, " + " init_status TEXT NOT NULL, " " last_scaled TEXT, " " created TEXT NOT NULL" ")" ) - self._con.execute("CREATE INDEX idx_todo ON builds(todo, last_scaled)") + self._con.execute( + "CREATE INDEX idx_init_status ON builds(init_status, created)" + ) self._con.execute("CREATE INDEX idx_status ON builds(status, last_scaled)") self._con.execute("CREATE INDEX idx_repo ON builds(repo)") @@ -83,7 +85,7 @@ class BuildsDb: " git_commit," " image," " status," - " todo, " + " init_status, " " last_scaled, " " created" ") " @@ -97,27 +99,30 @@ class BuildsDb: build.git_commit, build.image, build.status, - build.todo, + build.init_status, build.last_scaled, build.created.isoformat(), ), ) - def count_by_statuses(self, statuses: tuple[BuildStatus, ...]) -> int: - q = ",".join(["?"] * len(statuses)) + def count_by_status(self, status: BuildStatus) -> int: return self._con.execute( - f"SELECT COUNT(name) FROM builds WHERE status IN ({q})", statuses + "SELECT COUNT(name) FROM builds WHERE status=?", (status,) + ).fetchone()[0] + + def count_by_init_status(self, init_status: BuildInitStatus) -> int: + return self._con.execute( + "SELECT COUNT(name) FROM builds WHERE init_status=?", (init_status,) ).fetchone()[0] def count_all(self) -> int: return self._con.execute("SELECT COUNT(name) FROM builds").fetchone()[0] - def to_start(self, limit: int) -> list[Build]: - """Return the list of builds to start, ordered by todo timestamp.""" - # TODO ordering is not correct as setting todo does not set last_scaled + def to_initialize(self, limit: int) -> list[Build]: + """Return the list of builds to initialize, ordered by creation timestamp.""" rows = self._con.execute( - "SELECT * FROM builds WHERE todo=? ORDER BY last_scaled LIMIT ?", - (BuildTodo.start, limit), + "SELECT * FROM builds WHERE init_status=? ORDER BY created LIMIT ?", + (BuildInitStatus.todo, limit), ).fetchall() return [self._build_from_row(row) for row in rows] @@ -132,8 +137,8 @@ class BuildsDb: def oldest_stopped(self, limit: int) -> list[Build]: """Return a list of oldest stopped builds.""" rows = self._con.execute( - "SELECT * FROM builds WHERE status=? ORDER BY last_scaled LIMIT ?", - (BuildStatus.stopped, limit), + "SELECT * FROM builds WHERE status IN (?, ?) ORDER BY last_scaled LIMIT ?", + (BuildStatus.stopped, BuildStatus.failed, limit), ).fetchall() return [self._build_from_row(row) for row in rows] diff --git a/src/runboat/github.py b/src/runboat/github.py index 56d8fc9..dfa2cd9 100644 --- a/src/runboat/github.py +++ b/src/runboat/github.py @@ -8,6 +8,7 @@ from .exceptions import NotFoundOnGithub def _github_get(url: str) -> Any: full_url = f"https://api.github.com{url}" + # TODO github token headers = { "Accept": "application/vnd.github.v3+json", } diff --git a/src/runboat/k8s.py b/src/runboat/k8s.py index 3cd0100..164cb1c 100644 --- a/src/runboat/k8s.py +++ b/src/runboat/k8s.py @@ -3,6 +3,7 @@ import shutil import subprocess import tempfile from contextlib import contextmanager +from enum import Enum from importlib import resources from pathlib import Path from typing import Any, AsyncGenerator, Generator, Optional @@ -11,6 +12,7 @@ from jinja2 import Template from kubernetes_asyncio import client, config, watch from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio.client.models.v1_deployment import V1Deployment +from kubernetes_asyncio.client.models.v1_job import V1Job from pydantic import BaseModel from .settings import settings @@ -26,6 +28,17 @@ async def load_kube_config() -> None: await config.load_kube_config() +async def read_deployment(name: str) -> Optional[V1Deployment]: + async with ApiClient() as api: + appsv1 = client.AppsV1Api(api) + ret = await appsv1.list_namespaced_deployment( + namespace=settings.build_namespace, label_selector=f"runboat/build={name}" + ) + for item in ret.items: + return item # return first + return None # None found + + async def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: async with ApiClient() as api: appsv1 = client.AppsV1Api(api) @@ -47,8 +60,26 @@ async def watch_deployments() -> AsyncGenerator[tuple[str, V1Deployment], None]: yield event["type"], event["object"] +async def watch_jobs() -> AsyncGenerator[tuple[str, V1Job], None]: + w = watch.Watch() + # use the context manager to close http sessions automatically + async with ApiClient() as api: + appsv1 = client.BatchV1Api(api) + async for event in w.stream( + appsv1.list_namespaced_job, namespace=settings.build_namespace + ): + yield event["type"], event["object"] + + +class DeploymentMode(str, Enum): + deploy = "deploy" + initialize = "initialize" + cleanup = "cleanup" + + class DeploymentVars(BaseModel): namespace: str + mode: str build_name: str repo: str target_branch: str @@ -66,6 +97,7 @@ class DeploymentVars(BaseModel): def make_deployment_vars( + mode: DeploymentMode, build_name: str, slug: str, repo: str, @@ -76,6 +108,7 @@ def make_deployment_vars( ) -> DeploymentVars: image_name, image_tag = _split_image_name_tag(image) return DeploymentVars( + mode=mode, namespace=settings.build_namespace, build_name=build_name, repo=repo, @@ -135,45 +168,29 @@ async def deploy(deployment_vars: DeploymentVars) -> None: ) -async def dropdb(build_name: str) -> None: - await _kubectl( - [ - "-n", - settings.build_namespace, - "run", - f"dropdb-{build_name}", - "--restart=Never", - "--rm", - "-i", - "--tty", - "--image", - "postgres", - "--env", - f"PGHOST={settings.build_pghost}", - "--env", - f"PGPORT={settings.build_pgport}", - "--env", - f"PGUSER={settings.build_pguser}", - "--env", - f"PGPASSWORD={settings.build_pgpassword}", - "--", - "dropdb", - "--if-exists", - "--force", # pg 13+ - build_name, - ] - ) - - -async def undeploy(build_name: str) -> None: +async def delete_resources(build_name: str) -> None: await _kubectl( [ "-n", settings.build_namespace, "delete", - "service,deployment,ingress,secret,configmap", + "configmap,deployment,ingress,job,secret,service", "-l", f"runboat/build={build_name}", "--wait=false", ] ) + + +async def delete_job(build_name: str, job_kind: str) -> None: + await _kubectl( + [ + "-n", + settings.build_namespace, + "delete", + "job", + "-l", + f"runboat/build={build_name},runboat/job-kind={job_kind}", + "--wait=false", + ] + ) diff --git a/src/runboat/kubefiles/cleanup.yaml b/src/runboat/kubefiles/cleanup.yaml new file mode 100644 index 0000000..b64b5ac --- /dev/null +++ b/src/runboat/kubefiles/cleanup.yaml @@ -0,0 +1,27 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: cleanup + labels: + runboat/job-kind: cleanup +spec: + template: + spec: + containers: + - name: cleanup + image: postgres + volumeMounts: + - name: runboat-scripts + mountPath: /runboat + envFrom: + - secretRef: + name: odoosecretenv + - configMapRef: + name: odooenv + args: ["bash", "/runboat/runboat-cleanup.sh"] + volumes: + - name: runboat-scripts + configMap: + name: runboat-scripts + restartPolicy: Never + backoffLimit: 6 diff --git a/src/runboat/kubefiles/deployment.yaml b/src/runboat/kubefiles/deployment.yaml index 8671437..1540b3a 100644 --- a/src/runboat/kubefiles/deployment.yaml +++ b/src/runboat/kubefiles/deployment.yaml @@ -3,11 +3,9 @@ kind: Deployment metadata: name: odoo annotations: - runboat/todo: "start" # ask controller to start when there is capacity + runboat/init-status: "todo" # ask controller to start when there is capacity spec: replicas: 0 # deploy idle (with 0 replica) - strategy: - type: Recreate selector: matchLabels: app: odoo @@ -16,24 +14,7 @@ spec: labels: app: odoo spec: - # TODO restartPolicy: Never - # TODO terminationGracePeriodSeconds: 5 - initContainers: - - name: odoo-init - image: odoo - volumeMounts: - - name: runboat-scripts - mountPath: /runboat - envFrom: - - secretRef: - name: odoosecretenv - - configMapRef: - name: odooenv - args: ["bash", "/runboat/runboat-init.sh"] - resources: - limits: - cpu: 1000m - memory: 1Gi + terminationGracePeriodSeconds: 0 # no need to shutdown gracefully containers: - name: odoo image: odoo @@ -53,8 +34,11 @@ spec: args: ["bash", "/runboat/runboat-start.sh"] resources: limits: - cpu: 1000m + cpu: 800m memory: 1Gi + requests: + cpu: 50m + memory: 100Mi volumes: - name: runboat-scripts configMap: diff --git a/src/runboat/kubefiles/initialize.yaml b/src/runboat/kubefiles/initialize.yaml new file mode 100644 index 0000000..17b4b34 --- /dev/null +++ b/src/runboat/kubefiles/initialize.yaml @@ -0,0 +1,34 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: initalize + labels: + runboat/job-kind: initialize +spec: + template: + spec: + containers: + - name: initalize + image: odoo + volumeMounts: + - name: runboat-scripts + mountPath: /runboat + envFrom: + - secretRef: + name: odoosecretenv + - configMapRef: + name: odooenv + args: ["bash", "/runboat/runboat-initialize.sh"] + resources: + limits: + cpu: 1000m + memory: 1Gi + requests: + cpu: 1000m + memory: 1Gi + volumes: + - name: runboat-scripts + configMap: + name: runboat-scripts + restartPolicy: Never + backoffLimit: 0 diff --git a/src/runboat/kubefiles/kustomization.yaml.jinja b/src/runboat/kubefiles/kustomization.yaml.jinja index e75a9a5..58e4e8c 100644 --- a/src/runboat/kubefiles/kustomization.yaml.jinja +++ b/src/runboat/kubefiles/kustomization.yaml.jinja @@ -1,11 +1,17 @@ resources: + {% if mode == 'deploy' -%} - deployment.yaml - service.yaml - ingress.yaml + {% elif mode == "initialize" -%} + - initialize.yaml + {% elif mode == "cleanup" -%} + - cleanup.yaml + {% endif %} namespace: {{ namespace }} -nameSuffix: "-{{ build_name }}" +namePrefix: "{{ build_name }}-" commonLabels: runboat/build: "{{ build_name }}" @@ -41,12 +47,19 @@ configMapGenerator: - name: runboat-scripts files: - runboat-clone-and-install.sh - - runboat-init.sh + - runboat-initialize.sh + - runboat-cleanup.sh - runboat-start.sh + {% if mode == 'deploy' -%} - name: vars literals: - HOSTNAME={{ hostname }} + {% endif %} +generatorOptions: + disableNameSuffixHash: true + +{% if mode == 'deploy' -%} vars: - name: HOSTNAME objref: @@ -55,3 +68,4 @@ vars: apiVersion: v1 fieldref: fieldpath: data.HOSTNAME +{% endif %} diff --git a/src/runboat/kubefiles/runboat-cleanup.sh b/src/runboat/kubefiles/runboat-cleanup.sh new file mode 100755 index 0000000..da9b537 --- /dev/null +++ b/src/runboat/kubefiles/runboat-cleanup.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -ex + +dropdb --if-exists --force $PGDATABASE diff --git a/src/runboat/kubefiles/runboat-init.sh b/src/runboat/kubefiles/runboat-initialize.sh similarity index 69% rename from src/runboat/kubefiles/runboat-init.sh rename to src/runboat/kubefiles/runboat-initialize.sh index 83b9165..c1e6cc0 100755 --- a/src/runboat/kubefiles/runboat-init.sh +++ b/src/runboat/kubefiles/runboat-initialize.sh @@ -1,7 +1,7 @@ #!/bin/bash # -# Install all addons to test. +# Install all addons in the test database. # set -ex @@ -10,7 +10,8 @@ bash /runboat/runboat-clone-and-install.sh oca_wait_for_postgres -# TODO: do nothing if db exists and all addons are installed, so we can start instantly +# Drop database, in case we are reinitializing after failure. +dropdb --if-exists $PGDATABASE ADDONS=$(addons --addons-dir ${ADDONS_DIR} --include "${INCLUDE}" --exclude "${EXCLUDE}" list) diff --git a/src/runboat/models.py b/src/runboat/models.py index 51e08ef..38f476c 100644 --- a/src/runboat/models.py +++ b/src/runboat/models.py @@ -16,13 +16,19 @@ _logger = logging.getLogger(__name__) class BuildStatus(str, Enum): - stopped = "stopped" - starting = "starting" - started = "started" + stopped = "stopped" # initialization succeeded and 0 replicas + starting = "starting" # to initialize or initializing or scaling up + started = "started" # running + failed = "failed" # initialization failed + cleaning = "cleaning" # cleaning up, will be undeployed soon -class BuildTodo(str, Enum): - start = "start" +class BuildInitStatus(str, Enum): + todo = "todo" # to initialize as soon as there is capacity + started = "started" # initialization job running + succeeded = "succeeded" # initialization job succeeded + failed = "failed" # initialization job failed + cleaning = "cleaning" # cleanup job running class Build(BaseModel): @@ -34,10 +40,21 @@ class Build(BaseModel): git_commit: str image: str status: BuildStatus - todo: Optional[BuildTodo] - last_scaled: Optional[str] + init_status: BuildInitStatus + last_scaled: datetime.datetime created: datetime.datetime + def __str__(self) -> str: + return f"{self.slug} ({self.name})" + + @classmethod + async def from_name(cls, build_name: str) -> Optional["Build"]: + """Create a Build model by reading the k8s api.""" + deployment = await k8s.read_deployment(build_name) + if deployment is None: + return None + return cls.from_deployment(deployment) + @classmethod def from_deployment(cls, deployment: V1Deployment) -> "Build": return Build( @@ -48,26 +65,32 @@ class Build(BaseModel): pr=deployment.metadata.annotations["runboat/pr"] or None, git_commit=deployment.metadata.annotations["runboat/git-commit"], image=deployment.spec.template.spec.containers[0].image, + init_status=deployment.metadata.annotations["runboat/init-status"], status=cls._status_from_deployment(deployment), - todo=deployment.metadata.annotations["runboat/todo"] or None, last_scaled=deployment.metadata.annotations.get("runboat/last-scaled") - or None, + or datetime.datetime.utcnow(), created=deployment.metadata.creation_timestamp, ) @classmethod def _status_from_deployment(cls, deployment: V1Deployment) -> BuildStatus: - replicas = deployment.status.replicas - if not replicas: - status = BuildStatus.stopped - else: - if deployment.status.ready_replicas == replicas: - status = BuildStatus.started + init_status = deployment.metadata.annotations["runboat/init-status"] + if init_status in (BuildInitStatus.todo, BuildInitStatus.started): + return BuildStatus.starting + elif init_status == BuildInitStatus.cleaning: + return BuildStatus.cleaning + elif init_status == BuildInitStatus.failed: + return BuildStatus.failed + elif init_status == BuildInitStatus.succeeded: + replicas = deployment.status.replicas + if not replicas: + return BuildStatus.stopped else: - status = BuildStatus.starting - # TODO detect stopping, deploying, undeploying ? - # TODO: failed status - return status + if deployment.status.ready_replicas == replicas: + return BuildStatus.started + else: + return BuildStatus.starting + raise RuntimeError(f"Could not compute status of {deployment.metadata.name}.") @classmethod def make_slug( @@ -87,74 +110,17 @@ class Build(BaseModel): def link(self) -> str: return f"http://{self.slug}.{settings.build_domain}" - async def delay_start(self) -> None: - """Mark a build for startup. - - This is done by setting the runboat/todo annotation to 'start'. - The starter process will then start it when there is available - capacity. - """ - await k8s.patch_deployment( - self.deployment_name, - [ - { - "op": "replace", - "path": "/metadata/annotations/runboat~1todo", - "value": "start", - }, - ], - ) - - async def scale(self, replicas: int) -> None: - """Start a build. - - Set replicas, reset todo annotation, and set last-scaled - annotation. - """ - _logger.info(f"Scaling {self.slug} ({self.name}) to {replicas}.") - await k8s.patch_deployment( - self.deployment_name, - [ - { - # clear todo - "op": "replace", - "path": "/metadata/annotations/runboat~1todo", - "value": "", - }, - { - # record last scaled time for the stopper and undeployer - "op": "replace", - "path": "/metadata/annotations/runboat~1last-scaled", - "value": datetime.datetime.utcnow().isoformat() + "Z", - }, - { - # set replicas - "op": "replace", - "path": "/spec/replicas", - "value": replicas, - }, - ], - ) - - async def undeploy(self) -> None: - """Undeploy a build. - - Delete all resources, and drop the database. - """ - _logger.info(f"Undeploying {self.slug} ({self.name})") - await k8s.undeploy(self.name) - await k8s.dropdb(self.name) - @classmethod async def deploy( cls, repo: str, target_branch: str, pr: int | None, git_commit: str ) -> None: """Deploy a build, without starting it.""" - name = str(uuid.uuid4()) + name = f"b{uuid.uuid4()}" slug = cls.make_slug(repo, target_branch, pr, git_commit) - _logger.info(f"Deploying {slug} ({name})") + _logger.info(f"Deploying {slug} ({name}).") image = get_build_image(target_branch) deployment_vars = k8s.make_deployment_vars( + k8s.DeploymentMode.deploy, name, slug, repo.lower(), @@ -165,6 +131,142 @@ class Build(BaseModel): ) await k8s.deploy(deployment_vars) + async def start(self) -> None: + """Start build if init succeeded, or reinitialize if failed.""" + if self.status in (BuildStatus.started, BuildStatus.starting): + _logger.info( + f"Ignoring start command for {self} " + "that is already started or starting." + ) + return + elif self.status == BuildStatus.failed: + _logger.info(f"Marking failed {self} for reinitialization.") + await k8s.delete_job(self.name, job_kind="initialize") + await self._patch( + init_status=BuildInitStatus.todo, replicas=0, update_last_scaled=False + ) + elif self.status == BuildStatus.stopped: + _logger.info(f"Starting {self}.") + await self._patch(replicas=1, update_last_scaled=True) + + async def stop(self) -> None: + if self.status == BuildStatus.started: + _logger.info(f"Stopping {self}.") + await self._patch(replicas=0, update_last_scaled=True) + else: + _logger.info("Ignoring stop command for {self} " "that is not started.") + + async def initialize(self) -> None: + # Start initizalization job. on_init_started/on_init_succeeded/on_init_failed + # will be callsed back when it starts/succeeds/fails. + _logger.info(f"Deploying initialize job for {self}.") + deployment_vars = k8s.make_deployment_vars( + k8s.DeploymentMode.initialize, + self.name, + self.slug, + self.repo, + self.target_branch, + self.pr, + self.git_commit, + self.image, + ) + await k8s.deploy(deployment_vars) + + async def undeploy(self) -> None: + """Undeploy a build.""" + await self.stop() + # Start cleanup job. on_cleanup_XXX callbacks will follow. + _logger.info(f"Deploying cleanup job for {self}.") + deployment_vars = k8s.make_deployment_vars( + k8s.DeploymentMode.cleanup, + self.name, + self.slug, + self.repo, + self.target_branch, + self.pr, + self.git_commit, + self.image, + ) + await k8s.deploy(deployment_vars) + + async def on_initialize_started(self) -> None: + if self.init_status == BuildInitStatus.started: + return + _logger.info(f"Initialization job started for {self}.") + await self._patch( + init_status=BuildInitStatus.started, replicas=0, update_last_scaled=True + ) + + async def on_initialize_succeeded(self) -> None: + if self.init_status == BuildInitStatus.succeeded: + # Avoid restarting stopped deployments when the controller is notified of + # succeeded old initialization jobs after a controller restart. + return + _logger.info(f"Initialization job succeded for {self}, starting.") + await self._patch( + init_status=BuildInitStatus.succeeded, replicas=1, update_last_scaled=True + ) + + async def on_initialize_failed(self) -> None: + if self.init_status == BuildInitStatus.failed: + # Already marked as failed. We are probably here because the controller is + # restarting, and is notified of existing initialization jobs. + return + _logger.info(f"Initialization job failed for {self}.") + await self._patch( + init_status=BuildInitStatus.failed, replicas=0, update_last_scaled=True + ) + + async def on_cleanup_started(self) -> None: + _logger.info(f"Cleanup job started for {self}.") + await self._patch( + init_status=BuildInitStatus.cleaning, replicas=0, update_last_scaled=False + ) + + async def on_cleanup_succeeded(self) -> None: + _logger.info(f"Cleanup job succeeded for {self}, deleting resources.") + await k8s.delete_resources(self.name) + + async def on_cleanup_failed(self) -> None: + _logger.error( + f"Cleanup job failed for {self}, " f"manual intervention required." + ) + + async def _patch( + self, + init_status: BuildInitStatus | None = None, + replicas: int | None = None, + update_last_scaled: bool = True, + ) -> None: + ops = [] + if init_status is not None: + ops.extend( + [ + { + "op": "replace", + "path": "/metadata/annotations/runboat~1init-status", + "value": init_status, + }, + ], + ) + if replicas is not None: + ops.append( + { + "op": "replace", + "path": "/spec/replicas", + "value": replicas, + }, + ) + if update_last_scaled: + ops.append( + { + "op": "replace", + "path": "/metadata/annotations/runboat~1last-scaled", + "value": datetime.datetime.utcnow().isoformat() + "Z", + }, + ) + await k8s.patch_deployment(self.deployment_name, ops) + class Repo(BaseModel): name: str diff --git a/src/runboat/settings.py b/src/runboat/settings.py index 802dac6..3981b5d 100644 --- a/src/runboat/settings.py +++ b/src/runboat/settings.py @@ -5,8 +5,8 @@ class Settings(BaseSettings): admin_user: str admin_passwd: str supported_repos: set[str] - max_starting: int = 2 - max_running: int = 4 + max_initializing: int = 2 + max_started: int = 6 max_deployed: int = 10 build_namespace: str build_pghost: str