It's taking shape
This commit is contained in:
parent
6bd0c8e7cd
commit
e33ad90745
8 changed files with 458 additions and 298 deletions
17
README.md
17
README.md
|
|
@ -2,8 +2,7 @@
|
|||
|
||||
A simple runbot lookalike on kubernetes. Main goal is replacing the OCA runbot.
|
||||
|
||||
|
||||
# Requirements
|
||||
## Requirements
|
||||
|
||||
For running the builds:
|
||||
|
||||
|
|
@ -24,24 +23,28 @@ For running the controller:
|
|||
|
||||
Prototype:
|
||||
|
||||
- webhook
|
||||
- plug it on a bunch of OCA and shopinvader repos to test load
|
||||
- handle init failures, add failed status
|
||||
- reaper
|
||||
- basic API
|
||||
|
||||
MVP:
|
||||
|
||||
- finish api
|
||||
- log api endpoints
|
||||
- build/log and build/init-log api endpoints
|
||||
- report build status to github
|
||||
- k8s init container timeout
|
||||
- error handling in API
|
||||
- basic tests
|
||||
- look at other TODO in code
|
||||
- build image
|
||||
- build and publis runboat container image
|
||||
- deployment
|
||||
- plug it on shopinvader and acsone
|
||||
- plug it on shopinvader and acsone to test on small scale
|
||||
- create builds for all supported repos on startup (goes with sticky branches)
|
||||
- advanced reaper (sticky branches)
|
||||
- test what happens when the watcher looses connection to k8s
|
||||
|
||||
More:
|
||||
|
||||
- UI
|
||||
- handle PR close (delete all builds for PR)
|
||||
- handle branch delete (delete all builds for branch)
|
||||
|
|
|
|||
|
|
@ -4,8 +4,10 @@ from fastapi import APIRouter, Depends, HTTPException, status
|
|||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from . import controller, github
|
||||
from . import github, models
|
||||
from .controller import controller
|
||||
from .deps import authenticated
|
||||
from .settings import settings
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
|
@ -30,14 +32,15 @@ class Repo(BaseModel):
|
|||
|
||||
|
||||
class Build(BaseModel):
|
||||
# created: datetime.datetime
|
||||
# TODO created: datetime.datetime
|
||||
name: str
|
||||
repo: str
|
||||
target_branch: str
|
||||
pr: Optional[int]
|
||||
commit: str
|
||||
image: str
|
||||
link: str
|
||||
status: controller.BuildStatus
|
||||
status: models.BuildStatus
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
|
@ -50,21 +53,21 @@ class BranchOrPull(BaseModel):
|
|||
target_branch: str
|
||||
pr: Optional[int]
|
||||
link: str
|
||||
builds: List[Build]
|
||||
builds: list[Build]
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
read_with_orm_mode = True
|
||||
|
||||
|
||||
@router.get("/status", response_model=Status)
|
||||
async def controller_status():
|
||||
return controller.controller
|
||||
return controller
|
||||
|
||||
|
||||
@router.get("/repos", response_model=List[Repo])
|
||||
async def repos():
|
||||
# return models.Repo.all()
|
||||
...
|
||||
return [models.Repo(name=name) for name in settings.supported_repos]
|
||||
|
||||
|
||||
@router.get(
|
||||
|
|
@ -73,8 +76,7 @@ async def repos():
|
|||
response_model_exclude_none=True,
|
||||
)
|
||||
async def branches_and_pulls(org: str, repo: str):
|
||||
# return await models.Repo.by_org_repo(org, repo).branches_and_pulls()
|
||||
...
|
||||
return controller.db.branches_and_pulls(f"{org}/{repo}")
|
||||
|
||||
|
||||
@router.post(
|
||||
|
|
@ -86,7 +88,7 @@ async def trigger_branch(org: str, repo: str, branch: str):
|
|||
"""Trigger build for a branch."""
|
||||
# TODO async github call
|
||||
branch_info = github.get_branch_info(org, repo, branch)
|
||||
controller.Build.deploy(
|
||||
await models.Build.deploy(
|
||||
repo=f"{branch_info.org}/{branch_info.repo}",
|
||||
target_branch=branch_info.name,
|
||||
pr=None,
|
||||
|
|
@ -103,7 +105,7 @@ async def trigger_pull(org: str, repo: str, pr: int):
|
|||
"""Trigger build for a pull request."""
|
||||
# TODO async github call
|
||||
pull_info = github.get_pull_info(org, repo, pr)
|
||||
await controller.Build.deploy(
|
||||
await models.Build.deploy(
|
||||
repo=f"{pull_info.org}/{pull_info.repo}",
|
||||
target_branch=pull_info.target_branch,
|
||||
pr=pull_info.number,
|
||||
|
|
@ -111,10 +113,9 @@ async def trigger_pull(org: str, repo: str, pr: int):
|
|||
)
|
||||
|
||||
|
||||
def _build_by_name(name: str) -> controller.Build:
|
||||
def _build_by_name(name: str) -> models.Build:
|
||||
try:
|
||||
# TODO do not access controller internals
|
||||
return controller.controller._builds_by_name[name]
|
||||
return controller.db.get(name)
|
||||
except KeyError:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND)
|
||||
|
||||
|
|
@ -155,7 +156,7 @@ async def start(name: str):
|
|||
async def stop(name: str):
|
||||
"""Stop the deployment."""
|
||||
build = _build_by_name(name)
|
||||
await build.stop()
|
||||
await build.scale(0)
|
||||
|
||||
|
||||
@router.delete("/builds/{name}", dependencies=[Depends(authenticated)])
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
from fastapi import FastAPI
|
||||
|
||||
from . import api, controller, k8s
|
||||
from . import api, controller, k8s, webhooks
|
||||
|
||||
app = FastAPI(title="Runboat", description="Runbot on Kubernetes ☸️")
|
||||
app.include_router(api.router)
|
||||
app.include_router(webhooks.router)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
|
|
|
|||
|
|
@ -1,201 +1,31 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from enum import Enum
|
||||
|
||||
from kubernetes_asyncio.client.models.v1_deployment import V1Deployment
|
||||
|
||||
from runboat.build_images import get_build_image
|
||||
|
||||
from . import k8s
|
||||
from .db import BuildsDb
|
||||
from .models import Build, BuildStatus
|
||||
from .settings import settings
|
||||
from .utils import slugify
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BuildStatus(str, Enum):
|
||||
stopped = "stopped"
|
||||
starting = "starting"
|
||||
started = "started"
|
||||
|
||||
|
||||
class BuildTodo(str, Enum):
|
||||
start = "start"
|
||||
|
||||
|
||||
class Build:
|
||||
def __init__(self, deployment: V1Deployment):
|
||||
self._deployment = deployment
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._deployment.metadata.labels["runboat/build"]
|
||||
|
||||
@property
|
||||
def repo(self) -> str:
|
||||
return self._deployment.metadata.annotations["runboat/repo"]
|
||||
|
||||
@property
|
||||
def target_branch(self) -> str:
|
||||
return self._deployment.metadata.annotations["runboat/target-branch"]
|
||||
|
||||
@property
|
||||
def pr(self) -> int | None:
|
||||
return self._deployment.metadata.annotations["runboat/pr"] or None
|
||||
|
||||
@property
|
||||
def commit(self) -> str:
|
||||
return self._deployment.metadata.annotations["runboat/commit"]
|
||||
|
||||
@classmethod
|
||||
def make_slug(
|
||||
cls, repo: str, target_branch: str, pr: int | None, commit: str
|
||||
) -> str:
|
||||
slug = f"{slugify(repo)}-{slugify(target_branch)}"
|
||||
if pr:
|
||||
slug = f"{slug}-pr{slugify(pr)}"
|
||||
slug = f"{slug}-{commit[:12]}"
|
||||
return slug
|
||||
|
||||
@property
|
||||
def slug(self) -> str:
|
||||
return self.make_slug(self.repo, self.target_branch, self.pr, self.commit)
|
||||
|
||||
@property
|
||||
def link(self) -> str:
|
||||
return f"https://{self.slug}.{settings.build_domain}"
|
||||
|
||||
@property
|
||||
def status(self) -> BuildStatus:
|
||||
replicas = self._deployment.status.replicas
|
||||
if not replicas:
|
||||
status = BuildStatus.stopped
|
||||
else:
|
||||
if self._deployment.status.ready_replicas == replicas:
|
||||
status = BuildStatus.started
|
||||
else:
|
||||
status = BuildStatus.starting
|
||||
# TODO detect stopping, deploying, undeploying ?
|
||||
# TODO: failed status
|
||||
return status
|
||||
|
||||
@property
|
||||
def todo(self) -> BuildTodo | None:
|
||||
return self._deployment.metadata.annotations["runboat/todo"] or None
|
||||
|
||||
async def delay_start(self) -> None:
|
||||
"""Mark a build for startup.
|
||||
|
||||
This is done by setting the runboat/todo annotation to 'start'.
|
||||
This will in turn let the starter process it when there is
|
||||
available capacity.
|
||||
"""
|
||||
await k8s.patch_deployment(
|
||||
self._deployment.metadata.name,
|
||||
[
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "/metadata/annotations/runboat~1todo",
|
||||
"value": "start",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start a build.
|
||||
|
||||
Set replicas to 1, and reset todo.
|
||||
"""
|
||||
_logger.info(f"Starting {self.slug} ({self.name})")
|
||||
await k8s.patch_deployment(
|
||||
self._deployment.metadata.name,
|
||||
[
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "/metadata/annotations/runboat~1todo",
|
||||
"value": "",
|
||||
},
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "/spec/replicas",
|
||||
"value": 1,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop a build.
|
||||
|
||||
Set replicas to 0, and reset todo.
|
||||
"""
|
||||
_logger.info(f"Stopping {self.slug} ({self.name})")
|
||||
await k8s.patch_deployment(
|
||||
self._deployment.metadata.name,
|
||||
[
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "/metadata/annotations/runboat~1todo",
|
||||
"value": "",
|
||||
},
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "/spec/replicas",
|
||||
"value": 0,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
async def undeploy(self) -> None:
|
||||
"""Undeploy a build.
|
||||
|
||||
Delete all resources, and drop the database.
|
||||
"""
|
||||
_logger.info(f"Undeploying {self.slug} ({self.name})")
|
||||
await k8s.undeploy(self.name)
|
||||
await k8s.dropdb(self.name)
|
||||
|
||||
@classmethod
|
||||
async def deploy(
|
||||
cls, repo: str, target_branch: str, pr: int | None, commit: str
|
||||
) -> None:
|
||||
"""Deploy a build, without starting it."""
|
||||
name = str(uuid.uuid4())
|
||||
slug = cls.make_slug(repo, target_branch, pr, commit)
|
||||
_logger.info("Deploying {slug} ({name})")
|
||||
image = get_build_image(target_branch)
|
||||
deployment_vars = k8s.make_deployment_vars(
|
||||
name,
|
||||
slug,
|
||||
repo.lower(),
|
||||
target_branch,
|
||||
pr,
|
||||
commit,
|
||||
image,
|
||||
)
|
||||
await k8s.deploy(deployment_vars)
|
||||
|
||||
|
||||
class Controller:
|
||||
"""The controller monitors and manages the deployments.
|
||||
|
||||
It run several background tasks:
|
||||
- The 'watcher' listens to kubernetes events on deployements and maintains an
|
||||
in-memory data structure about existing deployments and their state. It wakes up
|
||||
the starter and the reaper when necessary.
|
||||
in-memory database of existing deployments and their state. It wakes up the
|
||||
starter and the reaper when necessary.
|
||||
- The 'starter' starts deployment that have been flagged to start, while making sure
|
||||
that the maximum number of deployment starting concurrently does not exceed the
|
||||
limit.
|
||||
- The 'reaper' stops old running deployments, and deletes old stopped deployments so
|
||||
as to limit the maximum number of each.
|
||||
- The 'stopper' stops old running deployments.
|
||||
- The 'undeployer' undeploys old stopped deployments.
|
||||
"""
|
||||
|
||||
db: BuildsDb
|
||||
_tasks: list[asyncio.Task]
|
||||
_wakeup_event: asyncio.Event
|
||||
_builds_by_name: dict[str, Build]
|
||||
_starting: int
|
||||
_started: int
|
||||
_starter_queue: asyncio.Queue
|
||||
|
||||
def __init__(self):
|
||||
self._tasks = []
|
||||
|
|
@ -203,111 +33,71 @@ class Controller:
|
|||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self._builds_by_name = {}
|
||||
self._starting = 0
|
||||
self._started = 0
|
||||
self._starter_queue = asyncio.Queue()
|
||||
self.db = BuildsDb()
|
||||
|
||||
@property
|
||||
def running(self) -> int:
|
||||
return self._starting + self._started
|
||||
return self.db.count_by_statuses([BuildStatus.started, BuildStatus.starting])
|
||||
|
||||
@property
|
||||
def starting(self) -> int:
|
||||
return self._starting
|
||||
return self.db.count_by_statuses([BuildStatus.starting])
|
||||
|
||||
@property
|
||||
def deployed(self) -> int:
|
||||
return len(self._builds_by_name)
|
||||
|
||||
def _add(self, build: Build) -> None:
|
||||
self._remove(build.name)
|
||||
if build.status == BuildStatus.starting:
|
||||
self._starting += 1
|
||||
elif build.status == BuildStatus.started:
|
||||
self._started += 1
|
||||
self._builds_by_name[build.name] = build
|
||||
|
||||
def _remove(self, build_name: str) -> None:
|
||||
old_build = self._builds_by_name.get(build_name)
|
||||
if old_build is None:
|
||||
return
|
||||
if old_build.status == BuildStatus.starting:
|
||||
self._starting -= 1
|
||||
elif old_build.status == BuildStatus.started:
|
||||
self._started -= 1
|
||||
del self._builds_by_name[build_name]
|
||||
return self.db.count_all()
|
||||
|
||||
def _wakeup(self) -> None:
|
||||
self._wakeup_event.set()
|
||||
self._wakeup_event.clear()
|
||||
|
||||
def added(self, build_name: str, deployment: V1Deployment) -> None:
|
||||
new_build = Build(deployment)
|
||||
assert new_build.name == build_name
|
||||
assert new_build.name not in self._builds_by_name
|
||||
if new_build.todo == BuildTodo.start:
|
||||
self._starter_queue.put_nowait(new_build.name)
|
||||
self._add(new_build)
|
||||
self._wakeup()
|
||||
|
||||
def modified(self, build_name: str, deployment: V1Deployment) -> None:
|
||||
new_build = Build(deployment)
|
||||
assert new_build.name == build_name
|
||||
assert new_build.name in self._builds_by_name
|
||||
old_build = self._builds_by_name[new_build.name]
|
||||
if new_build.todo == BuildTodo.start and new_build.todo != old_build.todo:
|
||||
self._starter_queue.put_nowait(new_build.name)
|
||||
self._add(new_build)
|
||||
self._wakeup()
|
||||
|
||||
def deleted(self, build_name: str) -> None:
|
||||
self._remove(build_name)
|
||||
self._wakeup()
|
||||
|
||||
async def watcher(self) -> None:
|
||||
self.reset() # empty the local db each time we start watching
|
||||
async for event_type, deployment in k8s.watch_deployments():
|
||||
build_name = deployment.metadata.labels.get("runboat/build")
|
||||
if not build_name:
|
||||
continue
|
||||
_logger.debug(f"{event_type} deployment {build_name}")
|
||||
if event_type == "ADDED":
|
||||
self.added(build_name, deployment)
|
||||
elif event_type == "MODIFIED":
|
||||
self.modified(build_name, deployment)
|
||||
if event_type in ("ADDED", "MODIFIED"):
|
||||
self.db.add(Build.from_deployment(deployment))
|
||||
elif event_type == "DELETED":
|
||||
self.deleted(build_name)
|
||||
self.db.remove(build_name)
|
||||
else:
|
||||
_logger.error(f"Unexpected event type {event_type}.")
|
||||
_logger.error(f"Unexpected k8s event type {event_type}.")
|
||||
self._wakeup()
|
||||
|
||||
async def starter(self) -> None:
|
||||
while True:
|
||||
await self._wakeup_event.wait()
|
||||
while not self._starter_queue.empty():
|
||||
if self.starting >= settings.max_starting:
|
||||
# Too many starting, back to sleep.
|
||||
while True:
|
||||
can_start = max(
|
||||
settings.max_running - self.running,
|
||||
settings.max_starting - self.starting,
|
||||
)
|
||||
if can_start <= 0:
|
||||
break
|
||||
if self.running > settings.max_running:
|
||||
# Too many started, back to sleep. If ==, we are going to start one
|
||||
# more and let the reaper do it's job to get back to the maximum.
|
||||
break
|
||||
build_name = await self._starter_queue.get()
|
||||
try:
|
||||
build = self._builds_by_name.get(build_name)
|
||||
if build is None:
|
||||
continue
|
||||
await build.start()
|
||||
finally:
|
||||
# TODO in case of exception, add back to starter queue ?
|
||||
self._starter_queue.task_done()
|
||||
for build in self.db.to_start(limit=can_start):
|
||||
await build.scale(1)
|
||||
|
||||
async def reaper(self) -> None:
|
||||
async def stopper(self) -> None:
|
||||
while True:
|
||||
await self._wakeup_event.wait()
|
||||
# TODO
|
||||
# - stop old started
|
||||
# - undeploy old deployed
|
||||
# - keep sticky builds
|
||||
while True:
|
||||
can_stop = self.running - settings.max_running
|
||||
if can_stop <= 0:
|
||||
break
|
||||
for build in self.db.oldest_started(limit=can_stop):
|
||||
await build.scale(0)
|
||||
|
||||
async def undeployer(self) -> None:
|
||||
while True:
|
||||
await self._wakeup_event.wait()
|
||||
while True:
|
||||
can_undeploy = self.deployed - settings.max_deployed
|
||||
if can_undeploy <= 0:
|
||||
break
|
||||
for build in self.db.oldest_stopped(limit=can_undeploy):
|
||||
await build.undeploy()
|
||||
|
||||
async def start(self):
|
||||
_logger.info("Starting controller tasks.")
|
||||
|
|
@ -323,7 +113,7 @@ class Controller:
|
|||
)
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
for f in (self.watcher, self.starter, self.reaper):
|
||||
for f in (self.watcher, self.starter, self.stopper, self.undeployer):
|
||||
self._tasks.append(asyncio.create_task(walking_dead(f)))
|
||||
|
||||
async def stop(self):
|
||||
|
|
|
|||
147
src/runboat/db.py
Normal file
147
src/runboat/db.py
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
import sqlite3
|
||||
|
||||
from .models import BranchOrPull, Build, BuildStatus, BuildTodo
|
||||
|
||||
|
||||
class BuildsDb:
|
||||
"""An in-memory database of builds.
|
||||
|
||||
It is maintained up-to-date by the controller that receives events from the cluster.
|
||||
We use sqlite3 to facilitate queries and sorting, such as counting by status, or
|
||||
finding oldest builds.
|
||||
|
||||
Querying it on each event from k8s is probably not the most efficient, but this
|
||||
should do for a start, and there are plenty of ways to optimize.
|
||||
"""
|
||||
|
||||
_con: sqlite3.Connection
|
||||
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
|
||||
@classmethod
|
||||
def _build_from_row(cls, row: sqlite3.Row) -> Build:
|
||||
return Build(**{k: row[k] for k in row.keys()})
|
||||
|
||||
def reset(self) -> None:
|
||||
self._con = sqlite3.connect(":memory:")
|
||||
self._con.row_factory = sqlite3.Row
|
||||
self._con.execute(
|
||||
"CREATE TABLE builds ("
|
||||
" name TEXT NOT NULL PRIMARY KEY, "
|
||||
" deployment_name TEXT NOT NULL, "
|
||||
" repo TEXT NOT NULL, "
|
||||
" target_branch TEXT NOT NULL, "
|
||||
" pr INTEGER, "
|
||||
" 'commit' TEXT NOT NULL, "
|
||||
" image TEXT NOT NULL,"
|
||||
" status TEXT NOT NULL, "
|
||||
" todo TEXT, "
|
||||
" last_scaled TEXT, "
|
||||
" created TEXT NOT NULL"
|
||||
")"
|
||||
)
|
||||
self._con.execute("CREATE INDEX idx_todo ON builds(todo, last_scaled)")
|
||||
self._con.execute("CREATE INDEX idx_status ON builds(status, last_scaled)")
|
||||
self._con.execute("CREATE INDEX idx_repo ON builds(repo)")
|
||||
|
||||
def get(self, name: str) -> Build | None:
|
||||
row = self._con.execute("SELECT * FROM builds WHERE name=?", (name,)).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return self._build_from_row(row)
|
||||
|
||||
def remove(self, name: str) -> None:
|
||||
with self._con:
|
||||
self._con.execute("DELETE FROM builds WHERE name=?", (name,))
|
||||
|
||||
def add(self, build: Build) -> None:
|
||||
with self._con:
|
||||
self._con.execute(
|
||||
"INSERT OR REPLACE INTO builds "
|
||||
"("
|
||||
" name,"
|
||||
" deployment_name,"
|
||||
" repo,"
|
||||
" target_branch,"
|
||||
" pr,"
|
||||
" 'commit',"
|
||||
" image,"
|
||||
" status,"
|
||||
" todo, "
|
||||
" last_scaled, "
|
||||
" created"
|
||||
") "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
build.name,
|
||||
build.deployment_name,
|
||||
build.repo,
|
||||
build.target_branch,
|
||||
build.pr,
|
||||
build.commit,
|
||||
build.image,
|
||||
build.status,
|
||||
build.todo,
|
||||
build.last_scaled,
|
||||
build.created,
|
||||
),
|
||||
)
|
||||
|
||||
def count_by_statuses(self, statuses: tuple[BuildStatus]) -> int:
|
||||
q = ",".join(["?"] * len(statuses))
|
||||
return self._con.execute(
|
||||
f"SELECT COUNT(name) FROM builds WHERE status IN ({q})", statuses
|
||||
).fetchone()[0]
|
||||
|
||||
def count_all(self) -> int:
|
||||
return self._con.execute("SELECT COUNT(name) FROM builds").fetchone()[0]
|
||||
|
||||
def to_start(self, limit: int) -> list[Build]:
|
||||
"""Return the list of builds to start, ordered by todo timestamp."""
|
||||
rows = self._con.execute(
|
||||
"SELECT * FROM builds WHERE todo=? ORDER BY last_scaled LIMIT ?",
|
||||
(BuildTodo.start, limit),
|
||||
).fetchall()
|
||||
return [self._build_from_row(row) for row in rows]
|
||||
|
||||
def oldest_started(self, limit: int) -> list[Build]:
|
||||
"""Return a list of oldest started builds."""
|
||||
rows = self._con.execute(
|
||||
"SELECT * FROM builds WHERE status=? ORDER BY last_scaled LIMIT ?",
|
||||
(BuildStatus.started, limit),
|
||||
).fetchall()
|
||||
return [self._build_from_row(row) for row in rows]
|
||||
|
||||
def oldest_stopped(self, limit: int) -> list[Build]:
|
||||
"""Return a list of oldest stopped builds."""
|
||||
rows = self._con.execute(
|
||||
"SELECT * FROM builds WHERE status=? ORDER BY last_scaled LIMIT ?",
|
||||
(BuildStatus.stopped, limit),
|
||||
).fetchall()
|
||||
return [self._build_from_row(row) for row in rows]
|
||||
|
||||
def branches_and_pulls(self, repo: str) -> list[BranchOrPull]:
|
||||
res = []
|
||||
branch_or_pull: BranchOrPull = None
|
||||
for row in self._con.execute(
|
||||
"SELECT * FROM builds WHERE repo=?"
|
||||
"ORDER BY target_branch, pr, created DESC",
|
||||
(repo,),
|
||||
).fetchall():
|
||||
build = self._build_from_row(row)
|
||||
if (
|
||||
branch_or_pull is None
|
||||
or branch_or_pull.repo != build.repo
|
||||
or branch_or_pull.target_branch != build.target_branch
|
||||
or branch_or_pull.pr != build.pr
|
||||
):
|
||||
branch_or_pull = BranchOrPull(
|
||||
repo=build.repo,
|
||||
target_branch=build.target_branch,
|
||||
pr=build.pr,
|
||||
builds=[],
|
||||
)
|
||||
res.append(branch_or_pull)
|
||||
branch_or_pull.builds.append(build)
|
||||
return res
|
||||
|
|
@ -16,10 +16,11 @@ spec:
|
|||
labels:
|
||||
app: odoo
|
||||
spec:
|
||||
# TODO restartPolicy: Never
|
||||
# TODO terminationGracePeriodSeconds: 5
|
||||
initContainers:
|
||||
- name: odoo-init
|
||||
image: odoo
|
||||
restartPolicy: Never
|
||||
volumeMounts:
|
||||
- name: runboat-scripts
|
||||
mountPath: /runboat
|
||||
|
|
@ -54,7 +55,6 @@ spec:
|
|||
limits:
|
||||
cpu: 1000m
|
||||
memory: 1Gi
|
||||
terminationGracePeriodSeconds: 5
|
||||
volumes:
|
||||
- name: runboat-scripts
|
||||
configMap:
|
||||
|
|
|
|||
195
src/runboat/models.py
Normal file
195
src/runboat/models.py
Normal file
|
|
@ -0,0 +1,195 @@
|
|||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from kubernetes_asyncio.client.models.v1_deployment import V1Deployment
|
||||
from pydantic import BaseModel
|
||||
|
||||
from . import k8s
|
||||
from .build_images import get_build_image
|
||||
from .settings import settings
|
||||
from .utils import slugify
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BuildStatus(str, Enum):
|
||||
stopped = "stopped"
|
||||
starting = "starting"
|
||||
started = "started"
|
||||
|
||||
|
||||
class BuildTodo(str, Enum):
|
||||
start = "start"
|
||||
|
||||
|
||||
class Build(BaseModel):
|
||||
name: str
|
||||
deployment_name: str
|
||||
repo: str
|
||||
target_branch: str
|
||||
pr: Optional[int]
|
||||
commit: str
|
||||
image: str
|
||||
status: BuildStatus
|
||||
todo: Optional[BuildTodo]
|
||||
last_scaled: Optional[str]
|
||||
created: str
|
||||
|
||||
@classmethod
|
||||
def from_deployment(cls, deployment: V1Deployment) -> "Build":
|
||||
return Build(
|
||||
name=deployment.metadata.labels["runboat/build"],
|
||||
deployment_name=deployment.metadata.name,
|
||||
repo=deployment.metadata.annotations["runboat/repo"],
|
||||
target_branch=deployment.metadata.annotations["runboat/target-branch"],
|
||||
pr=deployment.metadata.annotations["runboat/pr"] or None,
|
||||
commit=deployment.metadata.annotations["runboat/commit"],
|
||||
image=deployment.spec.template.spec.containers[0].image,
|
||||
status=cls._status_from_deployment(deployment),
|
||||
todo=deployment.metadata.annotations["runboat/todo"] or None,
|
||||
last_scaled=deployment.metadata.annotations.get("runboat/last-scaled")
|
||||
or None,
|
||||
created="TODO", # deployment.metadata.creationTimestamp,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _status_from_deployment(cls, deployment: V1Deployment) -> BuildStatus:
|
||||
replicas = deployment.status.replicas
|
||||
if not replicas:
|
||||
status = BuildStatus.stopped
|
||||
else:
|
||||
if deployment.status.ready_replicas == replicas:
|
||||
status = BuildStatus.started
|
||||
else:
|
||||
status = BuildStatus.starting
|
||||
# TODO detect stopping, deploying, undeploying ?
|
||||
# TODO: failed status
|
||||
return status
|
||||
|
||||
@classmethod
|
||||
def make_slug(
|
||||
cls, repo: str, target_branch: str, pr: int | None, commit: str
|
||||
) -> str:
|
||||
slug = f"{slugify(repo)}-{slugify(target_branch)}"
|
||||
if pr:
|
||||
slug = f"{slug}-pr{slugify(pr)}"
|
||||
slug = f"{slug}-{commit[:12]}"
|
||||
return slug
|
||||
|
||||
@property
|
||||
def slug(self) -> str:
|
||||
return self.make_slug(self.repo, self.target_branch, self.pr, self.commit)
|
||||
|
||||
@property
|
||||
def link(self) -> str:
|
||||
return f"http://{self.slug}.{settings.build_domain}"
|
||||
|
||||
async def delay_start(self) -> None:
|
||||
"""Mark a build for startup.
|
||||
|
||||
This is done by setting the runboat/todo annotation to 'start'.
|
||||
This will in turn let the starter process it when there is
|
||||
available capacity.
|
||||
"""
|
||||
await k8s.patch_deployment(
|
||||
self.deployment_name,
|
||||
[
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "/metadata/annotations/runboat~1todo",
|
||||
"value": "start",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
async def scale(self, replicas: int) -> None:
|
||||
"""Start a build.
|
||||
|
||||
Set replicas to 1, and reset todo.
|
||||
"""
|
||||
_logger.info(f"Scaling {self.slug} ({self.name}) to {replicas}.")
|
||||
await k8s.patch_deployment(
|
||||
self.deployment_name,
|
||||
[
|
||||
{
|
||||
# clear todo
|
||||
"op": "replace",
|
||||
"path": "/metadata/annotations/runboat~1todo",
|
||||
"value": "",
|
||||
},
|
||||
{
|
||||
# record last scaled time for the stopper and undeployer
|
||||
"op": "replace",
|
||||
"path": "/metadata/annotations/runboat~1last-scaled",
|
||||
"value": datetime.utcnow().isoformat() + "Z",
|
||||
},
|
||||
{
|
||||
# set replicas
|
||||
"op": "replace",
|
||||
"path": "/spec/replicas",
|
||||
"value": replicas,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
async def undeploy(self) -> None:
|
||||
"""Undeploy a build.
|
||||
|
||||
Delete all resources, and drop the database.
|
||||
"""
|
||||
_logger.info(f"Undeploying {self.slug} ({self.name})")
|
||||
await k8s.undeploy(self.name)
|
||||
await k8s.dropdb(self.name)
|
||||
|
||||
@classmethod
|
||||
async def deploy(
|
||||
cls, repo: str, target_branch: str, pr: int | None, commit: str
|
||||
) -> None:
|
||||
"""Deploy a build, without starting it."""
|
||||
name = str(uuid.uuid4())
|
||||
slug = cls.make_slug(repo, target_branch, pr, commit)
|
||||
_logger.info(f"Deploying {slug} ({name})")
|
||||
image = get_build_image(target_branch)
|
||||
deployment_vars = k8s.make_deployment_vars(
|
||||
name,
|
||||
slug,
|
||||
repo.lower(),
|
||||
target_branch,
|
||||
pr,
|
||||
commit,
|
||||
image,
|
||||
)
|
||||
await k8s.deploy(deployment_vars)
|
||||
|
||||
|
||||
class Repo(BaseModel):
|
||||
name: str
|
||||
|
||||
@property
|
||||
def link(self) -> str:
|
||||
return f"https://github.com/{self.name}"
|
||||
|
||||
class Config:
|
||||
read_with_orm_mode = True
|
||||
|
||||
|
||||
class BranchOrPull(BaseModel):
|
||||
repo: str
|
||||
target_branch: str
|
||||
pr: Optional[int]
|
||||
builds: list[Build]
|
||||
|
||||
class Config:
|
||||
read_with_orm_mode = True
|
||||
|
||||
@property
|
||||
def link(self) -> str:
|
||||
link = f"https://github.com/{self.repo}"
|
||||
if self.pr:
|
||||
link = f"{link}/pull/{self.pr}"
|
||||
else:
|
||||
link = f"{link}/tree/{self.target_branch}"
|
||||
return link
|
||||
|
|
@ -1,22 +1,45 @@
|
|||
def on_pr_open_or_update() -> None:
|
||||
# find Repo
|
||||
# find image from target branch (exit if not found)
|
||||
# find or create Branch
|
||||
# create Build
|
||||
# start build (enqueue)
|
||||
...
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, Header, Request
|
||||
|
||||
from . import controller
|
||||
from .settings import settings
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def on_pr_close_or_merge() -> None:
|
||||
# find Repo, Branch
|
||||
# delete branch (enqueue)
|
||||
...
|
||||
|
||||
|
||||
def on_push() -> None:
|
||||
# find Repo, branch
|
||||
# find image from target branch (exit if not found)
|
||||
# find or create Branch
|
||||
# create Build
|
||||
# start build (enqueue)
|
||||
...
|
||||
@router.post("/webhook/github")
|
||||
async def receive_payload(
|
||||
background_tasks: BackgroundTasks,
|
||||
request: Request,
|
||||
x_github_event: str = Header(...),
|
||||
):
|
||||
# TODO check x-hub-signature
|
||||
payload = await request.json()
|
||||
repo = payload["repository"]["full_name"]
|
||||
if not repo:
|
||||
return
|
||||
repo = repo.lower()
|
||||
if repo not in settings.supported_repos:
|
||||
_logger.info(f"Ignoring webhook delivery for unsupported repo {repo}.")
|
||||
return
|
||||
action = payload.get("action")
|
||||
if x_github_event == "pull_request":
|
||||
if action in ("opened", "synchronize"):
|
||||
background_tasks.add_task(
|
||||
controller.Build.deploy,
|
||||
repo=repo,
|
||||
target_branch=payload["pull_request"]["base"]["ref"],
|
||||
pr=payload["pull_request"]["number"],
|
||||
commit=payload["pull_request"]["head"]["sha"],
|
||||
)
|
||||
elif x_github_event == "push":
|
||||
background_tasks.add_task(
|
||||
controller.Build.deploy,
|
||||
repo=repo,
|
||||
target_branch=payload["ref"].split("/")[-1],
|
||||
pr=None,
|
||||
commit=payload["after"],
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in a new issue