More precise change detection

Fix: we read spec.replicas instead of status.replicas.
Do not wakeup needlessly when irrelevant changes
happen in deployments.
Better logging.
This commit is contained in:
Stéphane Bidoul 2021-10-31 16:21:36 +01:00
parent 9627729ba2
commit f5d232c3af
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
9 changed files with 110 additions and 75 deletions

View file

@ -48,7 +48,6 @@ MVP:
- better error handling in API (return 400 on user errors)
- basic tests
- build and publish runboat container image
- test what happens when the watcher looses connection to k8s
- look at other TODO in code to see if anything important remains
- basic UI (single page with a combo box to select repo and show builds by branch/pr,
with start/stop buttons)
@ -56,6 +55,7 @@ MVP:
More:
- shiny UI
- websocket stream of build changes, for a dynamic UI
- handle PR close (delete all builds for PR)
- handle branch delete (delete all builds for branch)
- create builds for all supported repos on startup (goes with sticky branches)

View file

@ -3,13 +3,14 @@ disable_existing_loggers: false
formatters:
rich:
datefmt: "[%X]"
format: "%(name)25s %(message)s"
# format: "%(name)25s %(message)s"
handlers:
console:
class: rich.logging.RichHandler
level: NOTSET
formatter: rich
rich_tracebacks: true
show_path: false
root:
level: DEBUG
handlers: [console]

View file

@ -100,14 +100,15 @@ class Controller:
build_name = deployment.metadata.labels.get("runboat/build")
if not build_name:
continue
_logger.debug(f"k8s {event_type} {deployment.kind} {build_name}")
should_wakeup = False
if event_type in ("ADDED", "MODIFIED"):
self.db.add(Build.from_deployment(deployment))
should_wakeup = self.db.add(Build.from_deployment(deployment))
elif event_type == "DELETED":
self.db.remove(build_name)
should_wakeup = self.db.remove(build_name)
else:
_logger.error(f"Unexpected k8s event type {event_type}.")
self._wakeup()
if should_wakeup:
self._wakeup()
async def job_watcher(self) -> None:
async for event_type, job in k8s.watch_jobs():
@ -117,7 +118,6 @@ class Controller:
job_kind = job.metadata.labels.get("runboat/job-kind")
if job_kind not in ("initialize", "cleanup"):
continue
_logger.debug(f"k8s {event_type} {job.kind} {job_kind} {build_name}")
if event_type in ("ADDED", "MODIFIED"):
build = self.db.get(build_name)
if build is None:
@ -125,21 +125,27 @@ class Controller:
# is starting and the db has not been populated yet.
build = await Build.from_name(build_name)
if build is None:
_logger.warning(
f"Received job event for {build_name} "
f"but the corresponding deployment is gone. "
f"Deleting all build resources."
)
await k8s.delete_resources(build_name)
continue
if job_kind == "initialize":
if job.status.succeeded:
if job.status.active:
await build.on_initialize_started()
elif job.status.succeeded:
await build.on_initialize_succeeded()
elif job.status.failed:
await build.on_initialize_failed()
else:
await build.on_initialize_started()
if job_kind == "cleanup":
if job.status.succeeded:
if job.status.active:
await build.on_cleanup_started()
elif job.status.succeeded:
await build.on_cleanup_succeeded()
elif job.status.failed:
await build.on_cleanup_failed()
else:
await build.on_cleanup_started()
elif event_type == "DELETED":
pass
else:
@ -155,8 +161,8 @@ class Controller:
if not to_initialize:
continue # nothing startable, back to sleep
_logger.info(
f"{self.initializing}/{self.max_initializing} builds are initializing. "
f"Initializing {len(to_initialize)} more."
f"{self.initializing} builds of max {self.max_initializing} "
f"are initializing. Initializing {len(to_initialize)} more."
)
for build in to_initialize:
await build.initialize()
@ -171,7 +177,7 @@ class Controller:
if not to_stop:
continue # nothing stoppable, back to sleep
_logger.info(
f"{self.started}/{self.max_started} builds started. "
f"{self.started} builds of max {self.max_started} are started. "
f"Stopping {len(to_stop)}."
)
for build in to_stop:
@ -187,7 +193,7 @@ class Controller:
if not to_undeploy:
continue # nothing undeployable, back to sleep
_logger.info(
f"{self.deployed}/{self.max_deployed} builds deployed. "
f"{self.deployed} builds of max {self.max_deployed} are deployed. "
f"Undeploying {len(to_undeploy)}."
)
for build in to_undeploy:

View file

@ -1,17 +1,17 @@
import logging
import sqlite3
from .models import BranchOrPull, Build, BuildInitStatus, BuildStatus
_logger = logging.getLogger(__name__)
class BuildsDb:
"""An in-memory database of builds.
It is maintained up-to-date by the controller that receives events from the cluster.
We use sqlite3 to facilitate queries and sorting, such as counting by status, or
finding oldest builds.
Querying it on each event from k8s is probably not the most efficient, but this
should do for a start, and there are plenty of ways to optimize.
It is maintained up-to-date by the controller that receives events
from the cluster. We use sqlite3 to facilitate queries and sorting,
such as counting by status, or finding oldest builds.
"""
_con: sqlite3.Connection
@ -35,6 +35,7 @@ class BuildsDb:
" pr INTEGER, "
" git_commit TEXT NOT NULL, "
" image TEXT NOT NULL,"
" desired_replicas INTEGER NOT NULL,"
" status TEXT NOT NULL, "
" init_status TEXT NOT NULL, "
" last_scaled TEXT, "
@ -69,10 +70,17 @@ class BuildsDb:
return self._build_from_row(row)
def remove(self, name: str) -> None:
if self.get(name) is None:
return False # no change
with self._con:
self._con.execute("DELETE FROM builds WHERE name=?", (name,))
_logger.info("Noticed removal of %s", name)
return True
def add(self, build: Build) -> None:
prev_build = self.get(build.name)
if prev_build == build:
return False # no change
with self._con:
self._con.execute(
"INSERT OR REPLACE INTO builds "
@ -84,12 +92,13 @@ class BuildsDb:
" pr,"
" git_commit,"
" image,"
" desired_replicas,"
" status,"
" init_status, "
" last_scaled, "
" created"
") "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
build.name,
build.deployment_name,
@ -98,12 +107,27 @@ class BuildsDb:
build.pr,
build.git_commit,
build.image,
build.desired_replicas,
build.status,
build.init_status,
build.last_scaled,
build.last_scaled.isoformat(),
build.created.isoformat(),
),
)
if prev_build is None:
action = "addition"
else:
action = "update"
_logger.info(
"Noticed %s of %s (%s/%s/desired_replicas=%s/last_scaled=%s)",
action,
build,
build.status,
build.init_status,
build.desired_replicas,
build.last_scaled,
)
return True
def count_by_status(self, status: BuildStatus) -> int:
return self._con.execute(

View file

@ -15,7 +15,7 @@ def _github_get(url: str) -> Any:
}
if settings.github_token:
headers["Authorization"] = f"token {settings.github_token}"
response = requests.get(full_url, headers)
response = requests.get(full_url, headers=headers)
if response.status_code == 404:
raise NotFoundOnGithub(f"GitHub URL not found: {full_url}.")
response.raise_for_status()

View file

@ -143,7 +143,9 @@ def _render_kubefiles(deployment_vars: DeploymentVars) -> Generator[Path, None,
async def _kubectl(args: list[str]) -> None:
proc = await asyncio.create_subprocess_exec("kubectl", *args)
proc = await asyncio.create_subprocess_exec(
"kubectl", *args, stdout=subprocess.DEVNULL
)
return_code = await proc.wait()
if return_code != 0:
raise subprocess.CalledProcessError(return_code, ["kubectl"] + args)
@ -159,13 +161,7 @@ async def deploy(deployment_vars: DeploymentVars) -> None:
str(tmp_path),
]
)
await _kubectl(
[
"apply",
"-k",
str(tmp_path),
]
)
await _kubectl(["apply", "-k", str(tmp_path), "--wait=false"])
async def delete_resources(build_name: str) -> None:

View file

@ -3,9 +3,9 @@ kind: Deployment
metadata:
name: odoo
annotations:
runboat/init-status: "todo" # ask controller to start when there is capacity
runboat/init-status: "todo" # ask controller to initialize when there is capacity
spec:
replicas: 0 # deploy idle (with 0 replica)
replicas: 0 # deploy idle
selector:
matchLabels:
app: odoo

View file

@ -4,7 +4,7 @@ set -ex
#
# Clone an addons repository at git reference in $ADDONS_DIR.
# Run oca_install_addons and oca_init_db on it.
# Run oca_install_addons on it.
#
git clone --filter=blob:none $RUNBOAT_GIT_REPO $ADDONS_DIR

View file

@ -24,7 +24,7 @@ class BuildStatus(str, Enum):
class BuildInitStatus(str, Enum):
todo = "todo" # to initialize as soon as there is capacity
todo = "todo" # to initialize and start as soon as there is capacity
started = "started" # initialization job running
succeeded = "succeeded" # initialization job succeeded
failed = "failed" # initialization job failed
@ -41,12 +41,26 @@ class Build(BaseModel):
image: str
status: BuildStatus
init_status: BuildInitStatus
desired_replicas: int
last_scaled: datetime.datetime
created: datetime.datetime
def __str__(self) -> str:
return f"{self.slug} ({self.name})"
def __eq__(self, other: "Build") -> bool:
if other is None:
return False
if self.name != other.name:
return False
# Ignore fields that are immutable by design.
return (
self.status == other.status
and self.init_status == other.init_status
and self.desired_replicas == other.desired_replicas
and self.last_scaled == other.last_scaled
)
@classmethod
async def from_name(cls, build_name: str) -> Optional["Build"]:
"""Create a Build model by reading the k8s api."""
@ -67,8 +81,9 @@ class Build(BaseModel):
image=deployment.spec.template.spec.containers[0].image,
init_status=deployment.metadata.annotations["runboat/init-status"],
status=cls._status_from_deployment(deployment),
desired_replicas=deployment.spec.replicas or 0,
last_scaled=deployment.metadata.annotations.get("runboat/last-scaled")
or datetime.datetime.utcnow(),
or deployment.metadata.creation_timestamp,
created=deployment.metadata.creation_timestamp,
)
@ -82,7 +97,7 @@ class Build(BaseModel):
elif init_status == BuildInitStatus.failed:
return BuildStatus.failed
elif init_status == BuildInitStatus.succeeded:
replicas = deployment.status.replicas
replicas = deployment.spec.replicas
if not replicas:
return BuildStatus.stopped
else:
@ -142,19 +157,17 @@ class Build(BaseModel):
elif self.status == BuildStatus.failed:
_logger.info(f"Marking failed {self} for reinitialization.")
await k8s.delete_job(self.name, job_kind="initialize")
await self._patch(
init_status=BuildInitStatus.todo, replicas=0, update_last_scaled=False
)
await self._patch(init_status=BuildInitStatus.todo, desired_replicas=0)
elif self.status == BuildStatus.stopped:
_logger.info(f"Starting {self}.")
await self._patch(replicas=1, update_last_scaled=True)
_logger.info(f"Starting {self} that was last scaled on {self.last_scaled}.")
await self._patch(desired_replicas=1)
async def stop(self) -> None:
if self.status == BuildStatus.started:
_logger.info(f"Stopping {self}.")
await self._patch(replicas=0, update_last_scaled=True)
_logger.info(f"Stopping {self} that was last scaled on {self.last_scaled}.")
await self._patch(desired_replicas=0)
else:
_logger.info("Ignoring stop command for {self} " "that is not started.")
_logger.info(f"Ignoring stop command for {self} that is not started.")
async def initialize(self) -> None:
# Start initizalization job. on_init_started/on_init_succeeded/on_init_failed
@ -193,9 +206,7 @@ class Build(BaseModel):
if self.init_status == BuildInitStatus.started:
return
_logger.info(f"Initialization job started for {self}.")
await self._patch(
init_status=BuildInitStatus.started, replicas=0, update_last_scaled=True
)
await self._patch(init_status=BuildInitStatus.started, desired_replicas=0)
async def on_initialize_succeeded(self) -> None:
if self.init_status == BuildInitStatus.succeeded:
@ -203,9 +214,7 @@ class Build(BaseModel):
# succeeded old initialization jobs after a controller restart.
return
_logger.info(f"Initialization job succeded for {self}, starting.")
await self._patch(
init_status=BuildInitStatus.succeeded, replicas=1, update_last_scaled=True
)
await self._patch(init_status=BuildInitStatus.succeeded, desired_replicas=1)
async def on_initialize_failed(self) -> None:
if self.init_status == BuildInitStatus.failed:
@ -213,15 +222,13 @@ class Build(BaseModel):
# restarting, and is notified of existing initialization jobs.
return
_logger.info(f"Initialization job failed for {self}.")
await self._patch(
init_status=BuildInitStatus.failed, replicas=0, update_last_scaled=True
)
await self._patch(init_status=BuildInitStatus.failed, desired_replicas=0)
async def on_cleanup_started(self) -> None:
if self.init_status == BuildInitStatus.cleaning:
return
_logger.info(f"Cleanup job started for {self}.")
await self._patch(
init_status=BuildInitStatus.cleaning, replicas=0, update_last_scaled=False
)
await self._patch(init_status=BuildInitStatus.cleaning, desired_replicas=0)
async def on_cleanup_succeeded(self) -> None:
_logger.info(f"Cleanup job succeeded for {self}, deleting resources.")
@ -235,11 +242,10 @@ class Build(BaseModel):
async def _patch(
self,
init_status: BuildInitStatus | None = None,
replicas: int | None = None,
update_last_scaled: bool = True,
desired_replicas: int | None = None,
) -> None:
ops = []
if init_status is not None:
if init_status is not None and init_status != self.init_status:
ops.extend(
[
{
@ -249,22 +255,24 @@ class Build(BaseModel):
},
],
)
if replicas is not None:
ops.append(
{
"op": "replace",
"path": "/spec/replicas",
"value": replicas,
},
)
if update_last_scaled:
ops.append(
if desired_replicas is not None and desired_replicas != self.desired_replicas:
ops.extend(
[
{
"op": "replace",
"path": "/spec/replicas",
"value": desired_replicas,
},
{
"op": "replace",
"path": "/metadata/annotations/runboat~1last-scaled",
"value": datetime.datetime.utcnow().isoformat() + "Z",
"value": datetime.datetime.utcnow()
.replace(microsecond=0)
.isoformat()
+ "Z",
},
)
]
)
await k8s.patch_deployment(self.deployment_name, ops)