From d6d9ac8a7f653287b2b297a02e6945895ae0d188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sun, 31 Oct 2021 11:04:51 +0100 Subject: [PATCH] Use jobs for db initialization an cleanup It turns out that init containers do not work, as we can't easily detect failures and access init log as they restart constantly. So we use initialization jobs. We take the opportunity to use cleanup jobs too, so all knowledge of runtime content is in the kubefiles. --- README.md | 61 +++- log-config-dev.yaml | 3 + src/runboat/api.py | 12 +- src/runboat/controller.py | 176 +++++++----- src/runboat/db.py | 35 ++- src/runboat/github.py | 1 + src/runboat/k8s.py | 81 +++--- src/runboat/kubefiles/cleanup.yaml | 27 ++ src/runboat/kubefiles/deployment.yaml | 28 +- src/runboat/kubefiles/initialize.yaml | 34 +++ .../kubefiles/kustomization.yaml.jinja | 18 +- src/runboat/kubefiles/runboat-cleanup.sh | 5 + ...{runboat-init.sh => runboat-initialize.sh} | 5 +- src/runboat/models.py | 260 ++++++++++++------ src/runboat/settings.py | 4 +- 15 files changed, 523 insertions(+), 227 deletions(-) create mode 100644 src/runboat/kubefiles/cleanup.yaml create mode 100644 src/runboat/kubefiles/initialize.yaml create mode 100755 src/runboat/kubefiles/runboat-cleanup.sh rename src/runboat/kubefiles/{runboat-init.sh => runboat-initialize.sh} (69%) diff --git a/README.md b/README.md index 1821c10..66197c7 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,8 @@ For running the controller: - Python 3.10 - `kubectl` - A `KUBECONFIG` that provides access to the namespace where the builds are deployed, - with permissions to create and delete service, deployment, ingress, secret and - configmap resources. + with permissions to create and delete Service, Job, Deployment, Ingress, Secret and + ConfigMap resources. ## Developing @@ -35,8 +35,9 @@ Contributions welcome. Prototype (min required to do load testing): +- github token for github api requests +- set requests otherwise requests is same as limits ? - plug it on a bunch of OCA and shopinvader repos to test load -- handle init failures, add failed status - configuring many repos in a .env file may be difficult, switch to a toml file ? MVP: @@ -62,3 +63,57 @@ More: - create builds for all supported repos on startup (goes with sticky branches) - never undeploy last build of sticky branches - make build images configurable (see `build_images.py`) + + +## Kubefiles + +Kustomize template with 3 modes (deploy, initialize, cleanup). + +## Synchronous actions on builds (fast) + +- deploy + + - create deployment with 0 replicas and runboat/init-status="todo" + +- start: + + - if runboat/init-status=="ready", scale to 1 + - elif runboat/init-status in ("todo", "initializing"), do nothing + - elif runboat/init-status=="failed", set runboat/init-status="todo" + +- stop: + + - scale deployment to 0 + +- undeploy: + + - scale deployment to 0 + - set runboat/init-status to "dropping" + - start dropdb job (restart=Never, backoffLimit=6) + +## Workers + +- initializer (works on deployments with runboat/init-status="todo", ordered by + runboat/init-status-timestamp), obeying max_initializing: + + - set runboat/init-status to "initializing" + - (re)create init job which will drop and init db (restart=Never, backoffLimit=0) + +- job-watcher: + + - on successful termination of initdb job: set runboat/init-status to "ready", scale + deployment to 1 + - on failure of initdb job: set runboat/init-status to "failed" + - on success of dropdb job: delete all resources + +- deployment-watcher: + + - maintains an in-memory db of deployments + +- stopper: + + - stop old started, to reach max_running + +- undeployer: + + - undeploy old stopped, to reach max_deployed diff --git a/log-config-dev.yaml b/log-config-dev.yaml index 8075887..842005c 100644 --- a/log-config-dev.yaml +++ b/log-config-dev.yaml @@ -13,3 +13,6 @@ handlers: root: level: DEBUG handlers: [console] +loggers: + kubernetes_asyncio.client.rest: + level: INFO diff --git a/src/runboat/api.py b/src/runboat/api.py index 7e8fa25..2374bac 100644 --- a/src/runboat/api.py +++ b/src/runboat/api.py @@ -16,10 +16,10 @@ router = APIRouter() class Status(BaseModel): deployed: int max_deployed: int - running: int - max_running: int - starting: int - max_starting: int + started: int + max_started: int + initializing: int + max_initializing: int class Config: orm_mode = True @@ -153,14 +153,14 @@ async def log(name: str): async def start(name: str): """Start the deployment.""" build = _build_by_name(name) - await build.delay_start() + await build.start() @router.post("/builds/{name}/stop") async def stop(name: str): """Stop the deployment.""" build = _build_by_name(name) - await build.scale(0) + await build.stop() @router.delete("/builds/{name}", dependencies=[Depends(authenticated)]) diff --git a/src/runboat/controller.py b/src/runboat/controller.py index 9112493..db11d34 100644 --- a/src/runboat/controller.py +++ b/src/runboat/controller.py @@ -1,9 +1,10 @@ import asyncio +import contextlib import logging from . import k8s from .db import BuildsDb -from .models import Build, BuildStatus +from .models import Build, BuildInitStatus, BuildStatus from .settings import settings _logger = logging.getLogger(__name__) @@ -14,12 +15,16 @@ class Controller: It runs several background tasks: - - The 'watcher' listens to kubernetes events on deployements and maintains an - in-memory database of existing deployments and their state. It wakes up the - starter and the reaper when necessary. - - The 'starter' starts deployments that have been flagged to start, while making - sure that the maximum number of deployment starting concurrently does not exceed - the limit. + - The 'deployment_watcher' listens to kubernetes events on deployments and maintains + an in-memory database of existing deployments and their state. It wakes up the + initializer, stopper and undeployer when the state of deployments change. + - The 'job_watcher' listens to kubernetes events on jobs, to maintain the + runboat/init-status annotation on deployments, and act on such events (such as + starting when an initialization succeeded or undeploying when a cleanup + succeeded). + - The 'initializer' starts initialization jobs for deployment that have been marked + with 'runboat/init-status=todo', while making sure that the maximum number of + deployments initializing concurrently does not exceed the limit. - The 'stopper' stops old running deployments. - The 'undeployer' undeploys old stopped deployments. """ @@ -37,20 +42,20 @@ class Controller: self.db = BuildsDb() @property - def running(self) -> int: - return self.db.count_by_statuses((BuildStatus.started, BuildStatus.starting)) + def started(self) -> int: + return self.db.count_by_status(BuildStatus.started) @property - def max_running(self) -> int: - return settings.max_running + def max_started(self) -> int: + return settings.max_started @property - def starting(self) -> int: - return self.db.count_by_statuses((BuildStatus.starting,)) + def initializing(self) -> int: + return self.db.count_by_init_status(BuildInitStatus.started) @property - def max_starting(self) -> int: - return settings.max_starting + def max_initializing(self) -> int: + return settings.max_initializing @property def deployed(self) -> int: @@ -70,7 +75,7 @@ class Controller: git_commit=git_commit, ) if build is not None: - await build.delay_start() + await build.start() return await Build.deploy( repo=repo, @@ -83,13 +88,19 @@ class Controller: self._wakeup_event.set() self._wakeup_event.clear() - async def watcher(self) -> None: + async def _sleep(self) -> None: + # Wait on the wakeup event, but wakeup after 10 seconds if nothing happens, + # in case we have missed an event. + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(self._wakeup_event.wait(), 10) + + async def deployment_watcher(self) -> None: self.reset() # empty the local db each time we start watching async for event_type, deployment in k8s.watch_deployments(): build_name = deployment.metadata.labels.get("runboat/build") if not build_name: continue - _logger.debug(f"{event_type} deployment {build_name}") + _logger.debug(f"k8s {event_type} {deployment.kind} {build_name}") if event_type in ("ADDED", "MODIFIED"): self.db.add(Build.from_deployment(deployment)) elif event_type == "DELETED": @@ -98,58 +109,89 @@ class Controller: _logger.error(f"Unexpected k8s event type {event_type}.") self._wakeup() - async def starter(self) -> None: + async def job_watcher(self) -> None: + async for event_type, job in k8s.watch_jobs(): + build_name = job.metadata.labels.get("runboat/build") + if not build_name: + continue + job_kind = job.metadata.labels.get("runboat/job-kind") + if job_kind not in ("initialize", "cleanup"): + continue + _logger.debug(f"k8s {event_type} {job.kind} {job_kind} {build_name}") + if event_type in ("ADDED", "MODIFIED"): + build = self.db.get(build_name) + if build is None: + # Not found in db, look in k8s api, in case the controller + # is starting and the db has not been populated yet. + build = await Build.from_name(build_name) + if build is None: + continue + if job_kind == "initialize": + if job.status.succeeded: + await build.on_initialize_succeeded() + elif job.status.failed: + await build.on_initialize_failed() + else: + await build.on_initialize_started() + if job_kind == "cleanup": + if job.status.succeeded: + await build.on_cleanup_succeeded() + elif job.status.failed: + await build.on_cleanup_failed() + else: + await build.on_cleanup_started() + elif event_type == "DELETED": + pass + else: + _logger.error(f"Unexpected k8s event type {event_type}.") + + async def initializer(self) -> None: while True: - await self._wakeup_event.wait() - while True: - can_start = min( - self.max_running - self.running + 1, - self.max_starting - self.starting, - ) - if can_start <= 0: - break # no capacity for now, back to sleep - to_start = self.db.to_start(limit=can_start) - if not to_start: - break # nothing startable, back to sleep - _logger.info(f"Starting {len(to_start)} builds of up to {can_start}.") - for build in to_start: - await build.scale(1) - if len(to_start) < can_start: - break # back to sleep + await self._sleep() + can_initialize = self.max_initializing - self.initializing + if can_initialize <= 0: + continue # no capacity for now, back to sleep + to_initialize = self.db.to_initialize(limit=can_initialize) + if not to_initialize: + continue # nothing startable, back to sleep + _logger.info( + f"{self.initializing}/{self.max_initializing} builds are initializing. " + f"Initializing {len(to_initialize)} more." + ) + for build in to_initialize: + await build.initialize() async def stopper(self) -> None: while True: - await self._wakeup_event.wait() - while True: - can_stop = self.running - self.max_running - if can_stop <= 0: - break # no need to stop for now, back to sleep - to_stop = self.db.oldest_started(limit=can_stop) - if not to_stop: - break # nothing stoppable, back to sleep - _logger.info(f"Stopping {len(to_stop)} builds of up to {can_stop}.") - for build in to_stop: - await build.scale(0) - if len(to_stop) < can_stop: - break # back to sleep + await self._sleep() + can_stop = self.started - self.max_started + if can_stop <= 0: + continue # no need to stop for now, back to sleep + to_stop = self.db.oldest_started(limit=can_stop) + if not to_stop: + continue # nothing stoppable, back to sleep + _logger.info( + f"{self.started}/{self.max_started} builds started. " + f"Stopping {len(to_stop)}." + ) + for build in to_stop: + await build.stop() async def undeployer(self) -> None: while True: - await self._wakeup_event.wait() - while True: - can_undeploy = self.deployed - self.max_deployed - if can_undeploy <= 0: - break # no need to undeploy for now, back to sleep - to_undeploy = self.db.oldest_stopped(limit=can_undeploy) - if not to_undeploy: - break # nothing undeployable, back to sleep - _logger.info( - f"Undeploying {len(to_undeploy)} builds of up to {can_undeploy}." - ) - for build in to_undeploy: - await build.undeploy() - if len(to_undeploy) < can_undeploy: - break # back to sleep + await self._sleep() + can_undeploy = self.deployed - self.max_deployed + if can_undeploy <= 0: + continue # no need to undeploy for now, back to sleep + to_undeploy = self.db.oldest_stopped(limit=can_undeploy) + if not to_undeploy: + continue # nothing undeployable, back to sleep + _logger.info( + f"{self.deployed}/{self.max_deployed} builds deployed. " + f"Undeploying {len(to_undeploy)}." + ) + for build in to_undeploy: + await build.undeploy() async def start(self) -> None: _logger.info("Starting controller tasks.") @@ -167,7 +209,13 @@ class Controller: ) await asyncio.sleep(delay) - for f in (self.watcher, self.starter, self.stopper, self.undeployer): + for f in ( + self.deployment_watcher, + self.job_watcher, + self.initializer, + self.stopper, + self.undeployer, + ): self._tasks.append(asyncio.create_task(walking_dead(f))) async def stop(self) -> None: diff --git a/src/runboat/db.py b/src/runboat/db.py index 36c690f..63db7cb 100644 --- a/src/runboat/db.py +++ b/src/runboat/db.py @@ -1,6 +1,6 @@ import sqlite3 -from .models import BranchOrPull, Build, BuildStatus, BuildTodo +from .models import BranchOrPull, Build, BuildInitStatus, BuildStatus class BuildsDb: @@ -36,12 +36,14 @@ class BuildsDb: " git_commit TEXT NOT NULL, " " image TEXT NOT NULL," " status TEXT NOT NULL, " - " todo TEXT, " + " init_status TEXT NOT NULL, " " last_scaled TEXT, " " created TEXT NOT NULL" ")" ) - self._con.execute("CREATE INDEX idx_todo ON builds(todo, last_scaled)") + self._con.execute( + "CREATE INDEX idx_init_status ON builds(init_status, created)" + ) self._con.execute("CREATE INDEX idx_status ON builds(status, last_scaled)") self._con.execute("CREATE INDEX idx_repo ON builds(repo)") @@ -83,7 +85,7 @@ class BuildsDb: " git_commit," " image," " status," - " todo, " + " init_status, " " last_scaled, " " created" ") " @@ -97,27 +99,30 @@ class BuildsDb: build.git_commit, build.image, build.status, - build.todo, + build.init_status, build.last_scaled, build.created.isoformat(), ), ) - def count_by_statuses(self, statuses: tuple[BuildStatus, ...]) -> int: - q = ",".join(["?"] * len(statuses)) + def count_by_status(self, status: BuildStatus) -> int: return self._con.execute( - f"SELECT COUNT(name) FROM builds WHERE status IN ({q})", statuses + "SELECT COUNT(name) FROM builds WHERE status=?", (status,) + ).fetchone()[0] + + def count_by_init_status(self, init_status: BuildInitStatus) -> int: + return self._con.execute( + "SELECT COUNT(name) FROM builds WHERE init_status=?", (init_status,) ).fetchone()[0] def count_all(self) -> int: return self._con.execute("SELECT COUNT(name) FROM builds").fetchone()[0] - def to_start(self, limit: int) -> list[Build]: - """Return the list of builds to start, ordered by todo timestamp.""" - # TODO ordering is not correct as setting todo does not set last_scaled + def to_initialize(self, limit: int) -> list[Build]: + """Return the list of builds to initialize, ordered by creation timestamp.""" rows = self._con.execute( - "SELECT * FROM builds WHERE todo=? ORDER BY last_scaled LIMIT ?", - (BuildTodo.start, limit), + "SELECT * FROM builds WHERE init_status=? ORDER BY created LIMIT ?", + (BuildInitStatus.todo, limit), ).fetchall() return [self._build_from_row(row) for row in rows] @@ -132,8 +137,8 @@ class BuildsDb: def oldest_stopped(self, limit: int) -> list[Build]: """Return a list of oldest stopped builds.""" rows = self._con.execute( - "SELECT * FROM builds WHERE status=? ORDER BY last_scaled LIMIT ?", - (BuildStatus.stopped, limit), + "SELECT * FROM builds WHERE status IN (?, ?) ORDER BY last_scaled LIMIT ?", + (BuildStatus.stopped, BuildStatus.failed, limit), ).fetchall() return [self._build_from_row(row) for row in rows] diff --git a/src/runboat/github.py b/src/runboat/github.py index 56d8fc9..dfa2cd9 100644 --- a/src/runboat/github.py +++ b/src/runboat/github.py @@ -8,6 +8,7 @@ from .exceptions import NotFoundOnGithub def _github_get(url: str) -> Any: full_url = f"https://api.github.com{url}" + # TODO github token headers = { "Accept": "application/vnd.github.v3+json", } diff --git a/src/runboat/k8s.py b/src/runboat/k8s.py index 3cd0100..164cb1c 100644 --- a/src/runboat/k8s.py +++ b/src/runboat/k8s.py @@ -3,6 +3,7 @@ import shutil import subprocess import tempfile from contextlib import contextmanager +from enum import Enum from importlib import resources from pathlib import Path from typing import Any, AsyncGenerator, Generator, Optional @@ -11,6 +12,7 @@ from jinja2 import Template from kubernetes_asyncio import client, config, watch from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio.client.models.v1_deployment import V1Deployment +from kubernetes_asyncio.client.models.v1_job import V1Job from pydantic import BaseModel from .settings import settings @@ -26,6 +28,17 @@ async def load_kube_config() -> None: await config.load_kube_config() +async def read_deployment(name: str) -> Optional[V1Deployment]: + async with ApiClient() as api: + appsv1 = client.AppsV1Api(api) + ret = await appsv1.list_namespaced_deployment( + namespace=settings.build_namespace, label_selector=f"runboat/build={name}" + ) + for item in ret.items: + return item # return first + return None # None found + + async def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: async with ApiClient() as api: appsv1 = client.AppsV1Api(api) @@ -47,8 +60,26 @@ async def watch_deployments() -> AsyncGenerator[tuple[str, V1Deployment], None]: yield event["type"], event["object"] +async def watch_jobs() -> AsyncGenerator[tuple[str, V1Job], None]: + w = watch.Watch() + # use the context manager to close http sessions automatically + async with ApiClient() as api: + appsv1 = client.BatchV1Api(api) + async for event in w.stream( + appsv1.list_namespaced_job, namespace=settings.build_namespace + ): + yield event["type"], event["object"] + + +class DeploymentMode(str, Enum): + deploy = "deploy" + initialize = "initialize" + cleanup = "cleanup" + + class DeploymentVars(BaseModel): namespace: str + mode: str build_name: str repo: str target_branch: str @@ -66,6 +97,7 @@ class DeploymentVars(BaseModel): def make_deployment_vars( + mode: DeploymentMode, build_name: str, slug: str, repo: str, @@ -76,6 +108,7 @@ def make_deployment_vars( ) -> DeploymentVars: image_name, image_tag = _split_image_name_tag(image) return DeploymentVars( + mode=mode, namespace=settings.build_namespace, build_name=build_name, repo=repo, @@ -135,45 +168,29 @@ async def deploy(deployment_vars: DeploymentVars) -> None: ) -async def dropdb(build_name: str) -> None: - await _kubectl( - [ - "-n", - settings.build_namespace, - "run", - f"dropdb-{build_name}", - "--restart=Never", - "--rm", - "-i", - "--tty", - "--image", - "postgres", - "--env", - f"PGHOST={settings.build_pghost}", - "--env", - f"PGPORT={settings.build_pgport}", - "--env", - f"PGUSER={settings.build_pguser}", - "--env", - f"PGPASSWORD={settings.build_pgpassword}", - "--", - "dropdb", - "--if-exists", - "--force", # pg 13+ - build_name, - ] - ) - - -async def undeploy(build_name: str) -> None: +async def delete_resources(build_name: str) -> None: await _kubectl( [ "-n", settings.build_namespace, "delete", - "service,deployment,ingress,secret,configmap", + "configmap,deployment,ingress,job,secret,service", "-l", f"runboat/build={build_name}", "--wait=false", ] ) + + +async def delete_job(build_name: str, job_kind: str) -> None: + await _kubectl( + [ + "-n", + settings.build_namespace, + "delete", + "job", + "-l", + f"runboat/build={build_name},runboat/job-kind={job_kind}", + "--wait=false", + ] + ) diff --git a/src/runboat/kubefiles/cleanup.yaml b/src/runboat/kubefiles/cleanup.yaml new file mode 100644 index 0000000..b64b5ac --- /dev/null +++ b/src/runboat/kubefiles/cleanup.yaml @@ -0,0 +1,27 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: cleanup + labels: + runboat/job-kind: cleanup +spec: + template: + spec: + containers: + - name: cleanup + image: postgres + volumeMounts: + - name: runboat-scripts + mountPath: /runboat + envFrom: + - secretRef: + name: odoosecretenv + - configMapRef: + name: odooenv + args: ["bash", "/runboat/runboat-cleanup.sh"] + volumes: + - name: runboat-scripts + configMap: + name: runboat-scripts + restartPolicy: Never + backoffLimit: 6 diff --git a/src/runboat/kubefiles/deployment.yaml b/src/runboat/kubefiles/deployment.yaml index 8671437..1540b3a 100644 --- a/src/runboat/kubefiles/deployment.yaml +++ b/src/runboat/kubefiles/deployment.yaml @@ -3,11 +3,9 @@ kind: Deployment metadata: name: odoo annotations: - runboat/todo: "start" # ask controller to start when there is capacity + runboat/init-status: "todo" # ask controller to start when there is capacity spec: replicas: 0 # deploy idle (with 0 replica) - strategy: - type: Recreate selector: matchLabels: app: odoo @@ -16,24 +14,7 @@ spec: labels: app: odoo spec: - # TODO restartPolicy: Never - # TODO terminationGracePeriodSeconds: 5 - initContainers: - - name: odoo-init - image: odoo - volumeMounts: - - name: runboat-scripts - mountPath: /runboat - envFrom: - - secretRef: - name: odoosecretenv - - configMapRef: - name: odooenv - args: ["bash", "/runboat/runboat-init.sh"] - resources: - limits: - cpu: 1000m - memory: 1Gi + terminationGracePeriodSeconds: 0 # no need to shutdown gracefully containers: - name: odoo image: odoo @@ -53,8 +34,11 @@ spec: args: ["bash", "/runboat/runboat-start.sh"] resources: limits: - cpu: 1000m + cpu: 800m memory: 1Gi + requests: + cpu: 50m + memory: 100Mi volumes: - name: runboat-scripts configMap: diff --git a/src/runboat/kubefiles/initialize.yaml b/src/runboat/kubefiles/initialize.yaml new file mode 100644 index 0000000..17b4b34 --- /dev/null +++ b/src/runboat/kubefiles/initialize.yaml @@ -0,0 +1,34 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: initalize + labels: + runboat/job-kind: initialize +spec: + template: + spec: + containers: + - name: initalize + image: odoo + volumeMounts: + - name: runboat-scripts + mountPath: /runboat + envFrom: + - secretRef: + name: odoosecretenv + - configMapRef: + name: odooenv + args: ["bash", "/runboat/runboat-initialize.sh"] + resources: + limits: + cpu: 1000m + memory: 1Gi + requests: + cpu: 1000m + memory: 1Gi + volumes: + - name: runboat-scripts + configMap: + name: runboat-scripts + restartPolicy: Never + backoffLimit: 0 diff --git a/src/runboat/kubefiles/kustomization.yaml.jinja b/src/runboat/kubefiles/kustomization.yaml.jinja index e75a9a5..58e4e8c 100644 --- a/src/runboat/kubefiles/kustomization.yaml.jinja +++ b/src/runboat/kubefiles/kustomization.yaml.jinja @@ -1,11 +1,17 @@ resources: + {% if mode == 'deploy' -%} - deployment.yaml - service.yaml - ingress.yaml + {% elif mode == "initialize" -%} + - initialize.yaml + {% elif mode == "cleanup" -%} + - cleanup.yaml + {% endif %} namespace: {{ namespace }} -nameSuffix: "-{{ build_name }}" +namePrefix: "{{ build_name }}-" commonLabels: runboat/build: "{{ build_name }}" @@ -41,12 +47,19 @@ configMapGenerator: - name: runboat-scripts files: - runboat-clone-and-install.sh - - runboat-init.sh + - runboat-initialize.sh + - runboat-cleanup.sh - runboat-start.sh + {% if mode == 'deploy' -%} - name: vars literals: - HOSTNAME={{ hostname }} + {% endif %} +generatorOptions: + disableNameSuffixHash: true + +{% if mode == 'deploy' -%} vars: - name: HOSTNAME objref: @@ -55,3 +68,4 @@ vars: apiVersion: v1 fieldref: fieldpath: data.HOSTNAME +{% endif %} diff --git a/src/runboat/kubefiles/runboat-cleanup.sh b/src/runboat/kubefiles/runboat-cleanup.sh new file mode 100755 index 0000000..da9b537 --- /dev/null +++ b/src/runboat/kubefiles/runboat-cleanup.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -ex + +dropdb --if-exists --force $PGDATABASE diff --git a/src/runboat/kubefiles/runboat-init.sh b/src/runboat/kubefiles/runboat-initialize.sh similarity index 69% rename from src/runboat/kubefiles/runboat-init.sh rename to src/runboat/kubefiles/runboat-initialize.sh index 83b9165..c1e6cc0 100755 --- a/src/runboat/kubefiles/runboat-init.sh +++ b/src/runboat/kubefiles/runboat-initialize.sh @@ -1,7 +1,7 @@ #!/bin/bash # -# Install all addons to test. +# Install all addons in the test database. # set -ex @@ -10,7 +10,8 @@ bash /runboat/runboat-clone-and-install.sh oca_wait_for_postgres -# TODO: do nothing if db exists and all addons are installed, so we can start instantly +# Drop database, in case we are reinitializing after failure. +dropdb --if-exists $PGDATABASE ADDONS=$(addons --addons-dir ${ADDONS_DIR} --include "${INCLUDE}" --exclude "${EXCLUDE}" list) diff --git a/src/runboat/models.py b/src/runboat/models.py index 51e08ef..38f476c 100644 --- a/src/runboat/models.py +++ b/src/runboat/models.py @@ -16,13 +16,19 @@ _logger = logging.getLogger(__name__) class BuildStatus(str, Enum): - stopped = "stopped" - starting = "starting" - started = "started" + stopped = "stopped" # initialization succeeded and 0 replicas + starting = "starting" # to initialize or initializing or scaling up + started = "started" # running + failed = "failed" # initialization failed + cleaning = "cleaning" # cleaning up, will be undeployed soon -class BuildTodo(str, Enum): - start = "start" +class BuildInitStatus(str, Enum): + todo = "todo" # to initialize as soon as there is capacity + started = "started" # initialization job running + succeeded = "succeeded" # initialization job succeeded + failed = "failed" # initialization job failed + cleaning = "cleaning" # cleanup job running class Build(BaseModel): @@ -34,10 +40,21 @@ class Build(BaseModel): git_commit: str image: str status: BuildStatus - todo: Optional[BuildTodo] - last_scaled: Optional[str] + init_status: BuildInitStatus + last_scaled: datetime.datetime created: datetime.datetime + def __str__(self) -> str: + return f"{self.slug} ({self.name})" + + @classmethod + async def from_name(cls, build_name: str) -> Optional["Build"]: + """Create a Build model by reading the k8s api.""" + deployment = await k8s.read_deployment(build_name) + if deployment is None: + return None + return cls.from_deployment(deployment) + @classmethod def from_deployment(cls, deployment: V1Deployment) -> "Build": return Build( @@ -48,26 +65,32 @@ class Build(BaseModel): pr=deployment.metadata.annotations["runboat/pr"] or None, git_commit=deployment.metadata.annotations["runboat/git-commit"], image=deployment.spec.template.spec.containers[0].image, + init_status=deployment.metadata.annotations["runboat/init-status"], status=cls._status_from_deployment(deployment), - todo=deployment.metadata.annotations["runboat/todo"] or None, last_scaled=deployment.metadata.annotations.get("runboat/last-scaled") - or None, + or datetime.datetime.utcnow(), created=deployment.metadata.creation_timestamp, ) @classmethod def _status_from_deployment(cls, deployment: V1Deployment) -> BuildStatus: - replicas = deployment.status.replicas - if not replicas: - status = BuildStatus.stopped - else: - if deployment.status.ready_replicas == replicas: - status = BuildStatus.started + init_status = deployment.metadata.annotations["runboat/init-status"] + if init_status in (BuildInitStatus.todo, BuildInitStatus.started): + return BuildStatus.starting + elif init_status == BuildInitStatus.cleaning: + return BuildStatus.cleaning + elif init_status == BuildInitStatus.failed: + return BuildStatus.failed + elif init_status == BuildInitStatus.succeeded: + replicas = deployment.status.replicas + if not replicas: + return BuildStatus.stopped else: - status = BuildStatus.starting - # TODO detect stopping, deploying, undeploying ? - # TODO: failed status - return status + if deployment.status.ready_replicas == replicas: + return BuildStatus.started + else: + return BuildStatus.starting + raise RuntimeError(f"Could not compute status of {deployment.metadata.name}.") @classmethod def make_slug( @@ -87,74 +110,17 @@ class Build(BaseModel): def link(self) -> str: return f"http://{self.slug}.{settings.build_domain}" - async def delay_start(self) -> None: - """Mark a build for startup. - - This is done by setting the runboat/todo annotation to 'start'. - The starter process will then start it when there is available - capacity. - """ - await k8s.patch_deployment( - self.deployment_name, - [ - { - "op": "replace", - "path": "/metadata/annotations/runboat~1todo", - "value": "start", - }, - ], - ) - - async def scale(self, replicas: int) -> None: - """Start a build. - - Set replicas, reset todo annotation, and set last-scaled - annotation. - """ - _logger.info(f"Scaling {self.slug} ({self.name}) to {replicas}.") - await k8s.patch_deployment( - self.deployment_name, - [ - { - # clear todo - "op": "replace", - "path": "/metadata/annotations/runboat~1todo", - "value": "", - }, - { - # record last scaled time for the stopper and undeployer - "op": "replace", - "path": "/metadata/annotations/runboat~1last-scaled", - "value": datetime.datetime.utcnow().isoformat() + "Z", - }, - { - # set replicas - "op": "replace", - "path": "/spec/replicas", - "value": replicas, - }, - ], - ) - - async def undeploy(self) -> None: - """Undeploy a build. - - Delete all resources, and drop the database. - """ - _logger.info(f"Undeploying {self.slug} ({self.name})") - await k8s.undeploy(self.name) - await k8s.dropdb(self.name) - @classmethod async def deploy( cls, repo: str, target_branch: str, pr: int | None, git_commit: str ) -> None: """Deploy a build, without starting it.""" - name = str(uuid.uuid4()) + name = f"b{uuid.uuid4()}" slug = cls.make_slug(repo, target_branch, pr, git_commit) - _logger.info(f"Deploying {slug} ({name})") + _logger.info(f"Deploying {slug} ({name}).") image = get_build_image(target_branch) deployment_vars = k8s.make_deployment_vars( + k8s.DeploymentMode.deploy, name, slug, repo.lower(), @@ -165,6 +131,142 @@ class Build(BaseModel): ) await k8s.deploy(deployment_vars) + async def start(self) -> None: + """Start build if init succeeded, or reinitialize if failed.""" + if self.status in (BuildStatus.started, BuildStatus.starting): + _logger.info( + f"Ignoring start command for {self} " + "that is already started or starting." + ) + return + elif self.status == BuildStatus.failed: + _logger.info(f"Marking failed {self} for reinitialization.") + await k8s.delete_job(self.name, job_kind="initialize") + await self._patch( + init_status=BuildInitStatus.todo, replicas=0, update_last_scaled=False + ) + elif self.status == BuildStatus.stopped: + _logger.info(f"Starting {self}.") + await self._patch(replicas=1, update_last_scaled=True) + + async def stop(self) -> None: + if self.status == BuildStatus.started: + _logger.info(f"Stopping {self}.") + await self._patch(replicas=0, update_last_scaled=True) + else: + _logger.info("Ignoring stop command for {self} " "that is not started.") + + async def initialize(self) -> None: + # Start initizalization job. on_init_started/on_init_succeeded/on_init_failed + # will be callsed back when it starts/succeeds/fails. + _logger.info(f"Deploying initialize job for {self}.") + deployment_vars = k8s.make_deployment_vars( + k8s.DeploymentMode.initialize, + self.name, + self.slug, + self.repo, + self.target_branch, + self.pr, + self.git_commit, + self.image, + ) + await k8s.deploy(deployment_vars) + + async def undeploy(self) -> None: + """Undeploy a build.""" + await self.stop() + # Start cleanup job. on_cleanup_XXX callbacks will follow. + _logger.info(f"Deploying cleanup job for {self}.") + deployment_vars = k8s.make_deployment_vars( + k8s.DeploymentMode.cleanup, + self.name, + self.slug, + self.repo, + self.target_branch, + self.pr, + self.git_commit, + self.image, + ) + await k8s.deploy(deployment_vars) + + async def on_initialize_started(self) -> None: + if self.init_status == BuildInitStatus.started: + return + _logger.info(f"Initialization job started for {self}.") + await self._patch( + init_status=BuildInitStatus.started, replicas=0, update_last_scaled=True + ) + + async def on_initialize_succeeded(self) -> None: + if self.init_status == BuildInitStatus.succeeded: + # Avoid restarting stopped deployments when the controller is notified of + # succeeded old initialization jobs after a controller restart. + return + _logger.info(f"Initialization job succeded for {self}, starting.") + await self._patch( + init_status=BuildInitStatus.succeeded, replicas=1, update_last_scaled=True + ) + + async def on_initialize_failed(self) -> None: + if self.init_status == BuildInitStatus.failed: + # Already marked as failed. We are probably here because the controller is + # restarting, and is notified of existing initialization jobs. + return + _logger.info(f"Initialization job failed for {self}.") + await self._patch( + init_status=BuildInitStatus.failed, replicas=0, update_last_scaled=True + ) + + async def on_cleanup_started(self) -> None: + _logger.info(f"Cleanup job started for {self}.") + await self._patch( + init_status=BuildInitStatus.cleaning, replicas=0, update_last_scaled=False + ) + + async def on_cleanup_succeeded(self) -> None: + _logger.info(f"Cleanup job succeeded for {self}, deleting resources.") + await k8s.delete_resources(self.name) + + async def on_cleanup_failed(self) -> None: + _logger.error( + f"Cleanup job failed for {self}, " f"manual intervention required." + ) + + async def _patch( + self, + init_status: BuildInitStatus | None = None, + replicas: int | None = None, + update_last_scaled: bool = True, + ) -> None: + ops = [] + if init_status is not None: + ops.extend( + [ + { + "op": "replace", + "path": "/metadata/annotations/runboat~1init-status", + "value": init_status, + }, + ], + ) + if replicas is not None: + ops.append( + { + "op": "replace", + "path": "/spec/replicas", + "value": replicas, + }, + ) + if update_last_scaled: + ops.append( + { + "op": "replace", + "path": "/metadata/annotations/runboat~1last-scaled", + "value": datetime.datetime.utcnow().isoformat() + "Z", + }, + ) + await k8s.patch_deployment(self.deployment_name, ops) + class Repo(BaseModel): name: str diff --git a/src/runboat/settings.py b/src/runboat/settings.py index 802dac6..3981b5d 100644 --- a/src/runboat/settings.py +++ b/src/runboat/settings.py @@ -5,8 +5,8 @@ class Settings(BaseSettings): admin_user: str admin_passwd: str supported_repos: set[str] - max_starting: int = 2 - max_running: int = 4 + max_initializing: int = 2 + max_started: int = 6 max_deployed: int = 10 build_namespace: str build_pghost: str