Use jobs for db initialization an cleanup

It turns out that init containers do not work, as we
can't easily detect failures and access init log as they
restart constantly. So we use initialization jobs.
We take the opportunity to use cleanup jobs too, so
all knowledge of runtime content is in the kubefiles.
This commit is contained in:
Stéphane Bidoul 2021-10-31 11:04:51 +01:00
parent 1f137bbdc8
commit d6d9ac8a7f
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
15 changed files with 523 additions and 227 deletions

View file

@ -16,8 +16,8 @@ For running the controller:
- Python 3.10
- `kubectl`
- A `KUBECONFIG` that provides access to the namespace where the builds are deployed,
with permissions to create and delete service, deployment, ingress, secret and
configmap resources.
with permissions to create and delete Service, Job, Deployment, Ingress, Secret and
ConfigMap resources.
## Developing
@ -35,8 +35,9 @@ Contributions welcome.
Prototype (min required to do load testing):
- github token for github api requests
- set requests otherwise requests is same as limits ?
- plug it on a bunch of OCA and shopinvader repos to test load
- handle init failures, add failed status
- configuring many repos in a .env file may be difficult, switch to a toml file ?
MVP:
@ -62,3 +63,57 @@ More:
- create builds for all supported repos on startup (goes with sticky branches)
- never undeploy last build of sticky branches
- make build images configurable (see `build_images.py`)
## Kubefiles
Kustomize template with 3 modes (deploy, initialize, cleanup).
## Synchronous actions on builds (fast)
- deploy
- create deployment with 0 replicas and runboat/init-status="todo"
- start:
- if runboat/init-status=="ready", scale to 1
- elif runboat/init-status in ("todo", "initializing"), do nothing
- elif runboat/init-status=="failed", set runboat/init-status="todo"
- stop:
- scale deployment to 0
- undeploy:
- scale deployment to 0
- set runboat/init-status to "dropping"
- start dropdb job (restart=Never, backoffLimit=6)
## Workers
- initializer (works on deployments with runboat/init-status="todo", ordered by
runboat/init-status-timestamp), obeying max_initializing:
- set runboat/init-status to "initializing"
- (re)create init job which will drop and init db (restart=Never, backoffLimit=0)
- job-watcher:
- on successful termination of initdb job: set runboat/init-status to "ready", scale
deployment to 1
- on failure of initdb job: set runboat/init-status to "failed"
- on success of dropdb job: delete all resources
- deployment-watcher:
- maintains an in-memory db of deployments
- stopper:
- stop old started, to reach max_running
- undeployer:
- undeploy old stopped, to reach max_deployed

View file

@ -13,3 +13,6 @@ handlers:
root:
level: DEBUG
handlers: [console]
loggers:
kubernetes_asyncio.client.rest:
level: INFO

View file

@ -16,10 +16,10 @@ router = APIRouter()
class Status(BaseModel):
deployed: int
max_deployed: int
running: int
max_running: int
starting: int
max_starting: int
started: int
max_started: int
initializing: int
max_initializing: int
class Config:
orm_mode = True
@ -153,14 +153,14 @@ async def log(name: str):
async def start(name: str):
"""Start the deployment."""
build = _build_by_name(name)
await build.delay_start()
await build.start()
@router.post("/builds/{name}/stop")
async def stop(name: str):
"""Stop the deployment."""
build = _build_by_name(name)
await build.scale(0)
await build.stop()
@router.delete("/builds/{name}", dependencies=[Depends(authenticated)])

View file

@ -1,9 +1,10 @@
import asyncio
import contextlib
import logging
from . import k8s
from .db import BuildsDb
from .models import Build, BuildStatus
from .models import Build, BuildInitStatus, BuildStatus
from .settings import settings
_logger = logging.getLogger(__name__)
@ -14,12 +15,16 @@ class Controller:
It runs several background tasks:
- The 'watcher' listens to kubernetes events on deployements and maintains an
in-memory database of existing deployments and their state. It wakes up the
starter and the reaper when necessary.
- The 'starter' starts deployments that have been flagged to start, while making
sure that the maximum number of deployment starting concurrently does not exceed
the limit.
- The 'deployment_watcher' listens to kubernetes events on deployments and maintains
an in-memory database of existing deployments and their state. It wakes up the
initializer, stopper and undeployer when the state of deployments change.
- The 'job_watcher' listens to kubernetes events on jobs, to maintain the
runboat/init-status annotation on deployments, and act on such events (such as
starting when an initialization succeeded or undeploying when a cleanup
succeeded).
- The 'initializer' starts initialization jobs for deployment that have been marked
with 'runboat/init-status=todo', while making sure that the maximum number of
deployments initializing concurrently does not exceed the limit.
- The 'stopper' stops old running deployments.
- The 'undeployer' undeploys old stopped deployments.
"""
@ -37,20 +42,20 @@ class Controller:
self.db = BuildsDb()
@property
def running(self) -> int:
return self.db.count_by_statuses((BuildStatus.started, BuildStatus.starting))
def started(self) -> int:
return self.db.count_by_status(BuildStatus.started)
@property
def max_running(self) -> int:
return settings.max_running
def max_started(self) -> int:
return settings.max_started
@property
def starting(self) -> int:
return self.db.count_by_statuses((BuildStatus.starting,))
def initializing(self) -> int:
return self.db.count_by_init_status(BuildInitStatus.started)
@property
def max_starting(self) -> int:
return settings.max_starting
def max_initializing(self) -> int:
return settings.max_initializing
@property
def deployed(self) -> int:
@ -70,7 +75,7 @@ class Controller:
git_commit=git_commit,
)
if build is not None:
await build.delay_start()
await build.start()
return
await Build.deploy(
repo=repo,
@ -83,13 +88,19 @@ class Controller:
self._wakeup_event.set()
self._wakeup_event.clear()
async def watcher(self) -> None:
async def _sleep(self) -> None:
# Wait on the wakeup event, but wakeup after 10 seconds if nothing happens,
# in case we have missed an event.
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._wakeup_event.wait(), 10)
async def deployment_watcher(self) -> None:
self.reset() # empty the local db each time we start watching
async for event_type, deployment in k8s.watch_deployments():
build_name = deployment.metadata.labels.get("runboat/build")
if not build_name:
continue
_logger.debug(f"{event_type} deployment {build_name}")
_logger.debug(f"k8s {event_type} {deployment.kind} {build_name}")
if event_type in ("ADDED", "MODIFIED"):
self.db.add(Build.from_deployment(deployment))
elif event_type == "DELETED":
@ -98,58 +109,89 @@ class Controller:
_logger.error(f"Unexpected k8s event type {event_type}.")
self._wakeup()
async def starter(self) -> None:
async def job_watcher(self) -> None:
async for event_type, job in k8s.watch_jobs():
build_name = job.metadata.labels.get("runboat/build")
if not build_name:
continue
job_kind = job.metadata.labels.get("runboat/job-kind")
if job_kind not in ("initialize", "cleanup"):
continue
_logger.debug(f"k8s {event_type} {job.kind} {job_kind} {build_name}")
if event_type in ("ADDED", "MODIFIED"):
build = self.db.get(build_name)
if build is None:
# Not found in db, look in k8s api, in case the controller
# is starting and the db has not been populated yet.
build = await Build.from_name(build_name)
if build is None:
continue
if job_kind == "initialize":
if job.status.succeeded:
await build.on_initialize_succeeded()
elif job.status.failed:
await build.on_initialize_failed()
else:
await build.on_initialize_started()
if job_kind == "cleanup":
if job.status.succeeded:
await build.on_cleanup_succeeded()
elif job.status.failed:
await build.on_cleanup_failed()
else:
await build.on_cleanup_started()
elif event_type == "DELETED":
pass
else:
_logger.error(f"Unexpected k8s event type {event_type}.")
async def initializer(self) -> None:
while True:
await self._wakeup_event.wait()
while True:
can_start = min(
self.max_running - self.running + 1,
self.max_starting - self.starting,
)
if can_start <= 0:
break # no capacity for now, back to sleep
to_start = self.db.to_start(limit=can_start)
if not to_start:
break # nothing startable, back to sleep
_logger.info(f"Starting {len(to_start)} builds of up to {can_start}.")
for build in to_start:
await build.scale(1)
if len(to_start) < can_start:
break # back to sleep
await self._sleep()
can_initialize = self.max_initializing - self.initializing
if can_initialize <= 0:
continue # no capacity for now, back to sleep
to_initialize = self.db.to_initialize(limit=can_initialize)
if not to_initialize:
continue # nothing startable, back to sleep
_logger.info(
f"{self.initializing}/{self.max_initializing} builds are initializing. "
f"Initializing {len(to_initialize)} more."
)
for build in to_initialize:
await build.initialize()
async def stopper(self) -> None:
while True:
await self._wakeup_event.wait()
while True:
can_stop = self.running - self.max_running
if can_stop <= 0:
break # no need to stop for now, back to sleep
to_stop = self.db.oldest_started(limit=can_stop)
if not to_stop:
break # nothing stoppable, back to sleep
_logger.info(f"Stopping {len(to_stop)} builds of up to {can_stop}.")
for build in to_stop:
await build.scale(0)
if len(to_stop) < can_stop:
break # back to sleep
await self._sleep()
can_stop = self.started - self.max_started
if can_stop <= 0:
continue # no need to stop for now, back to sleep
to_stop = self.db.oldest_started(limit=can_stop)
if not to_stop:
continue # nothing stoppable, back to sleep
_logger.info(
f"{self.started}/{self.max_started} builds started. "
f"Stopping {len(to_stop)}."
)
for build in to_stop:
await build.stop()
async def undeployer(self) -> None:
while True:
await self._wakeup_event.wait()
while True:
can_undeploy = self.deployed - self.max_deployed
if can_undeploy <= 0:
break # no need to undeploy for now, back to sleep
to_undeploy = self.db.oldest_stopped(limit=can_undeploy)
if not to_undeploy:
break # nothing undeployable, back to sleep
_logger.info(
f"Undeploying {len(to_undeploy)} builds of up to {can_undeploy}."
)
for build in to_undeploy:
await build.undeploy()
if len(to_undeploy) < can_undeploy:
break # back to sleep
await self._sleep()
can_undeploy = self.deployed - self.max_deployed
if can_undeploy <= 0:
continue # no need to undeploy for now, back to sleep
to_undeploy = self.db.oldest_stopped(limit=can_undeploy)
if not to_undeploy:
continue # nothing undeployable, back to sleep
_logger.info(
f"{self.deployed}/{self.max_deployed} builds deployed. "
f"Undeploying {len(to_undeploy)}."
)
for build in to_undeploy:
await build.undeploy()
async def start(self) -> None:
_logger.info("Starting controller tasks.")
@ -167,7 +209,13 @@ class Controller:
)
await asyncio.sleep(delay)
for f in (self.watcher, self.starter, self.stopper, self.undeployer):
for f in (
self.deployment_watcher,
self.job_watcher,
self.initializer,
self.stopper,
self.undeployer,
):
self._tasks.append(asyncio.create_task(walking_dead(f)))
async def stop(self) -> None:

View file

@ -1,6 +1,6 @@
import sqlite3
from .models import BranchOrPull, Build, BuildStatus, BuildTodo
from .models import BranchOrPull, Build, BuildInitStatus, BuildStatus
class BuildsDb:
@ -36,12 +36,14 @@ class BuildsDb:
" git_commit TEXT NOT NULL, "
" image TEXT NOT NULL,"
" status TEXT NOT NULL, "
" todo TEXT, "
" init_status TEXT NOT NULL, "
" last_scaled TEXT, "
" created TEXT NOT NULL"
")"
)
self._con.execute("CREATE INDEX idx_todo ON builds(todo, last_scaled)")
self._con.execute(
"CREATE INDEX idx_init_status ON builds(init_status, created)"
)
self._con.execute("CREATE INDEX idx_status ON builds(status, last_scaled)")
self._con.execute("CREATE INDEX idx_repo ON builds(repo)")
@ -83,7 +85,7 @@ class BuildsDb:
" git_commit,"
" image,"
" status,"
" todo, "
" init_status, "
" last_scaled, "
" created"
") "
@ -97,27 +99,30 @@ class BuildsDb:
build.git_commit,
build.image,
build.status,
build.todo,
build.init_status,
build.last_scaled,
build.created.isoformat(),
),
)
def count_by_statuses(self, statuses: tuple[BuildStatus, ...]) -> int:
q = ",".join(["?"] * len(statuses))
def count_by_status(self, status: BuildStatus) -> int:
return self._con.execute(
f"SELECT COUNT(name) FROM builds WHERE status IN ({q})", statuses
"SELECT COUNT(name) FROM builds WHERE status=?", (status,)
).fetchone()[0]
def count_by_init_status(self, init_status: BuildInitStatus) -> int:
return self._con.execute(
"SELECT COUNT(name) FROM builds WHERE init_status=?", (init_status,)
).fetchone()[0]
def count_all(self) -> int:
return self._con.execute("SELECT COUNT(name) FROM builds").fetchone()[0]
def to_start(self, limit: int) -> list[Build]:
"""Return the list of builds to start, ordered by todo timestamp."""
# TODO ordering is not correct as setting todo does not set last_scaled
def to_initialize(self, limit: int) -> list[Build]:
"""Return the list of builds to initialize, ordered by creation timestamp."""
rows = self._con.execute(
"SELECT * FROM builds WHERE todo=? ORDER BY last_scaled LIMIT ?",
(BuildTodo.start, limit),
"SELECT * FROM builds WHERE init_status=? ORDER BY created LIMIT ?",
(BuildInitStatus.todo, limit),
).fetchall()
return [self._build_from_row(row) for row in rows]
@ -132,8 +137,8 @@ class BuildsDb:
def oldest_stopped(self, limit: int) -> list[Build]:
"""Return a list of oldest stopped builds."""
rows = self._con.execute(
"SELECT * FROM builds WHERE status=? ORDER BY last_scaled LIMIT ?",
(BuildStatus.stopped, limit),
"SELECT * FROM builds WHERE status IN (?, ?) ORDER BY last_scaled LIMIT ?",
(BuildStatus.stopped, BuildStatus.failed, limit),
).fetchall()
return [self._build_from_row(row) for row in rows]

View file

@ -8,6 +8,7 @@ from .exceptions import NotFoundOnGithub
def _github_get(url: str) -> Any:
full_url = f"https://api.github.com{url}"
# TODO github token
headers = {
"Accept": "application/vnd.github.v3+json",
}

View file

@ -3,6 +3,7 @@ import shutil
import subprocess
import tempfile
from contextlib import contextmanager
from enum import Enum
from importlib import resources
from pathlib import Path
from typing import Any, AsyncGenerator, Generator, Optional
@ -11,6 +12,7 @@ from jinja2 import Template
from kubernetes_asyncio import client, config, watch
from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio.client.models.v1_deployment import V1Deployment
from kubernetes_asyncio.client.models.v1_job import V1Job
from pydantic import BaseModel
from .settings import settings
@ -26,6 +28,17 @@ async def load_kube_config() -> None:
await config.load_kube_config()
async def read_deployment(name: str) -> Optional[V1Deployment]:
async with ApiClient() as api:
appsv1 = client.AppsV1Api(api)
ret = await appsv1.list_namespaced_deployment(
namespace=settings.build_namespace, label_selector=f"runboat/build={name}"
)
for item in ret.items:
return item # return first
return None # None found
async def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None:
async with ApiClient() as api:
appsv1 = client.AppsV1Api(api)
@ -47,8 +60,26 @@ async def watch_deployments() -> AsyncGenerator[tuple[str, V1Deployment], None]:
yield event["type"], event["object"]
async def watch_jobs() -> AsyncGenerator[tuple[str, V1Job], None]:
w = watch.Watch()
# use the context manager to close http sessions automatically
async with ApiClient() as api:
appsv1 = client.BatchV1Api(api)
async for event in w.stream(
appsv1.list_namespaced_job, namespace=settings.build_namespace
):
yield event["type"], event["object"]
class DeploymentMode(str, Enum):
deploy = "deploy"
initialize = "initialize"
cleanup = "cleanup"
class DeploymentVars(BaseModel):
namespace: str
mode: str
build_name: str
repo: str
target_branch: str
@ -66,6 +97,7 @@ class DeploymentVars(BaseModel):
def make_deployment_vars(
mode: DeploymentMode,
build_name: str,
slug: str,
repo: str,
@ -76,6 +108,7 @@ def make_deployment_vars(
) -> DeploymentVars:
image_name, image_tag = _split_image_name_tag(image)
return DeploymentVars(
mode=mode,
namespace=settings.build_namespace,
build_name=build_name,
repo=repo,
@ -135,45 +168,29 @@ async def deploy(deployment_vars: DeploymentVars) -> None:
)
async def dropdb(build_name: str) -> None:
await _kubectl(
[
"-n",
settings.build_namespace,
"run",
f"dropdb-{build_name}",
"--restart=Never",
"--rm",
"-i",
"--tty",
"--image",
"postgres",
"--env",
f"PGHOST={settings.build_pghost}",
"--env",
f"PGPORT={settings.build_pgport}",
"--env",
f"PGUSER={settings.build_pguser}",
"--env",
f"PGPASSWORD={settings.build_pgpassword}",
"--",
"dropdb",
"--if-exists",
"--force", # pg 13+
build_name,
]
)
async def undeploy(build_name: str) -> None:
async def delete_resources(build_name: str) -> None:
await _kubectl(
[
"-n",
settings.build_namespace,
"delete",
"service,deployment,ingress,secret,configmap",
"configmap,deployment,ingress,job,secret,service",
"-l",
f"runboat/build={build_name}",
"--wait=false",
]
)
async def delete_job(build_name: str, job_kind: str) -> None:
await _kubectl(
[
"-n",
settings.build_namespace,
"delete",
"job",
"-l",
f"runboat/build={build_name},runboat/job-kind={job_kind}",
"--wait=false",
]
)

View file

@ -0,0 +1,27 @@
apiVersion: batch/v1
kind: Job
metadata:
name: cleanup
labels:
runboat/job-kind: cleanup
spec:
template:
spec:
containers:
- name: cleanup
image: postgres
volumeMounts:
- name: runboat-scripts
mountPath: /runboat
envFrom:
- secretRef:
name: odoosecretenv
- configMapRef:
name: odooenv
args: ["bash", "/runboat/runboat-cleanup.sh"]
volumes:
- name: runboat-scripts
configMap:
name: runboat-scripts
restartPolicy: Never
backoffLimit: 6

View file

@ -3,11 +3,9 @@ kind: Deployment
metadata:
name: odoo
annotations:
runboat/todo: "start" # ask controller to start when there is capacity
runboat/init-status: "todo" # ask controller to start when there is capacity
spec:
replicas: 0 # deploy idle (with 0 replica)
strategy:
type: Recreate
selector:
matchLabels:
app: odoo
@ -16,24 +14,7 @@ spec:
labels:
app: odoo
spec:
# TODO restartPolicy: Never
# TODO terminationGracePeriodSeconds: 5
initContainers:
- name: odoo-init
image: odoo
volumeMounts:
- name: runboat-scripts
mountPath: /runboat
envFrom:
- secretRef:
name: odoosecretenv
- configMapRef:
name: odooenv
args: ["bash", "/runboat/runboat-init.sh"]
resources:
limits:
cpu: 1000m
memory: 1Gi
terminationGracePeriodSeconds: 0 # no need to shutdown gracefully
containers:
- name: odoo
image: odoo
@ -53,8 +34,11 @@ spec:
args: ["bash", "/runboat/runboat-start.sh"]
resources:
limits:
cpu: 1000m
cpu: 800m
memory: 1Gi
requests:
cpu: 50m
memory: 100Mi
volumes:
- name: runboat-scripts
configMap:

View file

@ -0,0 +1,34 @@
apiVersion: batch/v1
kind: Job
metadata:
name: initalize
labels:
runboat/job-kind: initialize
spec:
template:
spec:
containers:
- name: initalize
image: odoo
volumeMounts:
- name: runboat-scripts
mountPath: /runboat
envFrom:
- secretRef:
name: odoosecretenv
- configMapRef:
name: odooenv
args: ["bash", "/runboat/runboat-initialize.sh"]
resources:
limits:
cpu: 1000m
memory: 1Gi
requests:
cpu: 1000m
memory: 1Gi
volumes:
- name: runboat-scripts
configMap:
name: runboat-scripts
restartPolicy: Never
backoffLimit: 0

View file

@ -1,11 +1,17 @@
resources:
{% if mode == 'deploy' -%}
- deployment.yaml
- service.yaml
- ingress.yaml
{% elif mode == "initialize" -%}
- initialize.yaml
{% elif mode == "cleanup" -%}
- cleanup.yaml
{% endif %}
namespace: {{ namespace }}
nameSuffix: "-{{ build_name }}"
namePrefix: "{{ build_name }}-"
commonLabels:
runboat/build: "{{ build_name }}"
@ -41,12 +47,19 @@ configMapGenerator:
- name: runboat-scripts
files:
- runboat-clone-and-install.sh
- runboat-init.sh
- runboat-initialize.sh
- runboat-cleanup.sh
- runboat-start.sh
{% if mode == 'deploy' -%}
- name: vars
literals:
- HOSTNAME={{ hostname }}
{% endif %}
generatorOptions:
disableNameSuffixHash: true
{% if mode == 'deploy' -%}
vars:
- name: HOSTNAME
objref:
@ -55,3 +68,4 @@ vars:
apiVersion: v1
fieldref:
fieldpath: data.HOSTNAME
{% endif %}

View file

@ -0,0 +1,5 @@
#!/bin/bash
set -ex
dropdb --if-exists --force $PGDATABASE

View file

@ -1,7 +1,7 @@
#!/bin/bash
#
# Install all addons to test.
# Install all addons in the test database.
#
set -ex
@ -10,7 +10,8 @@ bash /runboat/runboat-clone-and-install.sh
oca_wait_for_postgres
# TODO: do nothing if db exists and all addons are installed, so we can start instantly
# Drop database, in case we are reinitializing after failure.
dropdb --if-exists $PGDATABASE
ADDONS=$(addons --addons-dir ${ADDONS_DIR} --include "${INCLUDE}" --exclude "${EXCLUDE}" list)

View file

@ -16,13 +16,19 @@ _logger = logging.getLogger(__name__)
class BuildStatus(str, Enum):
stopped = "stopped"
starting = "starting"
started = "started"
stopped = "stopped" # initialization succeeded and 0 replicas
starting = "starting" # to initialize or initializing or scaling up
started = "started" # running
failed = "failed" # initialization failed
cleaning = "cleaning" # cleaning up, will be undeployed soon
class BuildTodo(str, Enum):
start = "start"
class BuildInitStatus(str, Enum):
todo = "todo" # to initialize as soon as there is capacity
started = "started" # initialization job running
succeeded = "succeeded" # initialization job succeeded
failed = "failed" # initialization job failed
cleaning = "cleaning" # cleanup job running
class Build(BaseModel):
@ -34,10 +40,21 @@ class Build(BaseModel):
git_commit: str
image: str
status: BuildStatus
todo: Optional[BuildTodo]
last_scaled: Optional[str]
init_status: BuildInitStatus
last_scaled: datetime.datetime
created: datetime.datetime
def __str__(self) -> str:
return f"{self.slug} ({self.name})"
@classmethod
async def from_name(cls, build_name: str) -> Optional["Build"]:
"""Create a Build model by reading the k8s api."""
deployment = await k8s.read_deployment(build_name)
if deployment is None:
return None
return cls.from_deployment(deployment)
@classmethod
def from_deployment(cls, deployment: V1Deployment) -> "Build":
return Build(
@ -48,26 +65,32 @@ class Build(BaseModel):
pr=deployment.metadata.annotations["runboat/pr"] or None,
git_commit=deployment.metadata.annotations["runboat/git-commit"],
image=deployment.spec.template.spec.containers[0].image,
init_status=deployment.metadata.annotations["runboat/init-status"],
status=cls._status_from_deployment(deployment),
todo=deployment.metadata.annotations["runboat/todo"] or None,
last_scaled=deployment.metadata.annotations.get("runboat/last-scaled")
or None,
or datetime.datetime.utcnow(),
created=deployment.metadata.creation_timestamp,
)
@classmethod
def _status_from_deployment(cls, deployment: V1Deployment) -> BuildStatus:
replicas = deployment.status.replicas
if not replicas:
status = BuildStatus.stopped
else:
if deployment.status.ready_replicas == replicas:
status = BuildStatus.started
init_status = deployment.metadata.annotations["runboat/init-status"]
if init_status in (BuildInitStatus.todo, BuildInitStatus.started):
return BuildStatus.starting
elif init_status == BuildInitStatus.cleaning:
return BuildStatus.cleaning
elif init_status == BuildInitStatus.failed:
return BuildStatus.failed
elif init_status == BuildInitStatus.succeeded:
replicas = deployment.status.replicas
if not replicas:
return BuildStatus.stopped
else:
status = BuildStatus.starting
# TODO detect stopping, deploying, undeploying ?
# TODO: failed status
return status
if deployment.status.ready_replicas == replicas:
return BuildStatus.started
else:
return BuildStatus.starting
raise RuntimeError(f"Could not compute status of {deployment.metadata.name}.")
@classmethod
def make_slug(
@ -87,74 +110,17 @@ class Build(BaseModel):
def link(self) -> str:
return f"http://{self.slug}.{settings.build_domain}"
async def delay_start(self) -> None:
"""Mark a build for startup.
This is done by setting the runboat/todo annotation to 'start'.
The starter process will then start it when there is available
capacity.
"""
await k8s.patch_deployment(
self.deployment_name,
[
{
"op": "replace",
"path": "/metadata/annotations/runboat~1todo",
"value": "start",
},
],
)
async def scale(self, replicas: int) -> None:
"""Start a build.
Set replicas, reset todo annotation, and set last-scaled
annotation.
"""
_logger.info(f"Scaling {self.slug} ({self.name}) to {replicas}.")
await k8s.patch_deployment(
self.deployment_name,
[
{
# clear todo
"op": "replace",
"path": "/metadata/annotations/runboat~1todo",
"value": "",
},
{
# record last scaled time for the stopper and undeployer
"op": "replace",
"path": "/metadata/annotations/runboat~1last-scaled",
"value": datetime.datetime.utcnow().isoformat() + "Z",
},
{
# set replicas
"op": "replace",
"path": "/spec/replicas",
"value": replicas,
},
],
)
async def undeploy(self) -> None:
"""Undeploy a build.
Delete all resources, and drop the database.
"""
_logger.info(f"Undeploying {self.slug} ({self.name})")
await k8s.undeploy(self.name)
await k8s.dropdb(self.name)
@classmethod
async def deploy(
cls, repo: str, target_branch: str, pr: int | None, git_commit: str
) -> None:
"""Deploy a build, without starting it."""
name = str(uuid.uuid4())
name = f"b{uuid.uuid4()}"
slug = cls.make_slug(repo, target_branch, pr, git_commit)
_logger.info(f"Deploying {slug} ({name})")
_logger.info(f"Deploying {slug} ({name}).")
image = get_build_image(target_branch)
deployment_vars = k8s.make_deployment_vars(
k8s.DeploymentMode.deploy,
name,
slug,
repo.lower(),
@ -165,6 +131,142 @@ class Build(BaseModel):
)
await k8s.deploy(deployment_vars)
async def start(self) -> None:
"""Start build if init succeeded, or reinitialize if failed."""
if self.status in (BuildStatus.started, BuildStatus.starting):
_logger.info(
f"Ignoring start command for {self} "
"that is already started or starting."
)
return
elif self.status == BuildStatus.failed:
_logger.info(f"Marking failed {self} for reinitialization.")
await k8s.delete_job(self.name, job_kind="initialize")
await self._patch(
init_status=BuildInitStatus.todo, replicas=0, update_last_scaled=False
)
elif self.status == BuildStatus.stopped:
_logger.info(f"Starting {self}.")
await self._patch(replicas=1, update_last_scaled=True)
async def stop(self) -> None:
if self.status == BuildStatus.started:
_logger.info(f"Stopping {self}.")
await self._patch(replicas=0, update_last_scaled=True)
else:
_logger.info("Ignoring stop command for {self} " "that is not started.")
async def initialize(self) -> None:
# Start initizalization job. on_init_started/on_init_succeeded/on_init_failed
# will be callsed back when it starts/succeeds/fails.
_logger.info(f"Deploying initialize job for {self}.")
deployment_vars = k8s.make_deployment_vars(
k8s.DeploymentMode.initialize,
self.name,
self.slug,
self.repo,
self.target_branch,
self.pr,
self.git_commit,
self.image,
)
await k8s.deploy(deployment_vars)
async def undeploy(self) -> None:
"""Undeploy a build."""
await self.stop()
# Start cleanup job. on_cleanup_XXX callbacks will follow.
_logger.info(f"Deploying cleanup job for {self}.")
deployment_vars = k8s.make_deployment_vars(
k8s.DeploymentMode.cleanup,
self.name,
self.slug,
self.repo,
self.target_branch,
self.pr,
self.git_commit,
self.image,
)
await k8s.deploy(deployment_vars)
async def on_initialize_started(self) -> None:
if self.init_status == BuildInitStatus.started:
return
_logger.info(f"Initialization job started for {self}.")
await self._patch(
init_status=BuildInitStatus.started, replicas=0, update_last_scaled=True
)
async def on_initialize_succeeded(self) -> None:
if self.init_status == BuildInitStatus.succeeded:
# Avoid restarting stopped deployments when the controller is notified of
# succeeded old initialization jobs after a controller restart.
return
_logger.info(f"Initialization job succeded for {self}, starting.")
await self._patch(
init_status=BuildInitStatus.succeeded, replicas=1, update_last_scaled=True
)
async def on_initialize_failed(self) -> None:
if self.init_status == BuildInitStatus.failed:
# Already marked as failed. We are probably here because the controller is
# restarting, and is notified of existing initialization jobs.
return
_logger.info(f"Initialization job failed for {self}.")
await self._patch(
init_status=BuildInitStatus.failed, replicas=0, update_last_scaled=True
)
async def on_cleanup_started(self) -> None:
_logger.info(f"Cleanup job started for {self}.")
await self._patch(
init_status=BuildInitStatus.cleaning, replicas=0, update_last_scaled=False
)
async def on_cleanup_succeeded(self) -> None:
_logger.info(f"Cleanup job succeeded for {self}, deleting resources.")
await k8s.delete_resources(self.name)
async def on_cleanup_failed(self) -> None:
_logger.error(
f"Cleanup job failed for {self}, " f"manual intervention required."
)
async def _patch(
self,
init_status: BuildInitStatus | None = None,
replicas: int | None = None,
update_last_scaled: bool = True,
) -> None:
ops = []
if init_status is not None:
ops.extend(
[
{
"op": "replace",
"path": "/metadata/annotations/runboat~1init-status",
"value": init_status,
},
],
)
if replicas is not None:
ops.append(
{
"op": "replace",
"path": "/spec/replicas",
"value": replicas,
},
)
if update_last_scaled:
ops.append(
{
"op": "replace",
"path": "/metadata/annotations/runboat~1last-scaled",
"value": datetime.datetime.utcnow().isoformat() + "Z",
},
)
await k8s.patch_deployment(self.deployment_name, ops)
class Repo(BaseModel):
name: str

View file

@ -5,8 +5,8 @@ class Settings(BaseSettings):
admin_user: str
admin_passwd: str
supported_repos: set[str]
max_starting: int = 2
max_running: int = 4
max_initializing: int = 2
max_started: int = 6
max_deployed: int = 10
build_namespace: str
build_pghost: str