Use finalizers to track cleanup

This commit is contained in:
Stéphane Bidoul 2021-11-04 09:37:40 +01:00
parent d79cb6223e
commit 1f52025f97
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
6 changed files with 74 additions and 31 deletions

View file

@ -21,6 +21,7 @@ class Status(BaseModel):
to_initialize: int to_initialize: int
initializing: int initializing: int
max_initializing: int max_initializing: int
undeploying: int
class Config: class Config:
orm_mode = True orm_mode = True

View file

@ -69,6 +69,10 @@ class Controller:
def max_deployed(self) -> int: def max_deployed(self) -> int:
return settings.max_deployed return settings.max_deployed
@property
def undeploying(self) -> int:
return self.db.count_by_status(BuildStatus.undeploying)
async def deploy_or_delay_start( async def deploy_or_delay_start(
self, repo: str, target_branch: str, pr: int | None, git_commit: str self, repo: str, target_branch: str, pr: int | None, git_commit: str
) -> None: ) -> None:
@ -117,7 +121,7 @@ class Controller:
self.reset() # empty the local db each time we start watching self.reset() # empty the local db each time we start watching
async for event_type, deployment in k8s.watch_deployments(): async for event_type, deployment in k8s.watch_deployments():
_logger.debug( _logger.debug(
"%s %s %s dr=%s/rr=%s", "Event %s %s %s dr=%s/rr=%s",
event_type, event_type,
deployment.metadata.name, deployment.metadata.name,
deployment.metadata.resource_version, deployment.metadata.resource_version,
@ -129,7 +133,14 @@ class Controller:
continue continue
should_wakeup = False should_wakeup = False
if event_type in (None, "ADDED", "MODIFIED"): if event_type in (None, "ADDED", "MODIFIED"):
should_wakeup = self.db.add(Build.from_deployment(deployment)) prev_build = self.db.get(build_name)
build = Build.from_deployment(deployment)
should_wakeup = self.db.add(build)
if build.status == BuildStatus.undeploying and (
prev_build is None or prev_build.status != BuildStatus.undeploying
):
_logger.info(f"{build} has deletionTimestamp. Undeploying.")
await build.cleanup()
elif event_type == "DELETED": elif event_type == "DELETED":
should_wakeup = self.db.remove(build_name) should_wakeup = self.db.remove(build_name)
else: else:
@ -140,7 +151,7 @@ class Controller:
async def job_watcher(self) -> None: async def job_watcher(self) -> None:
async for event_type, job in k8s.watch_jobs(): async for event_type, job in k8s.watch_jobs():
_logger.debug( _logger.debug(
"%s %s %s a=%s/s=%s/f=%s", "Event %s %s %s a=%s/s=%s/f=%s",
event_type, event_type,
job.metadata.name, job.metadata.name,
job.metadata.resource_version, job.metadata.resource_version,

View file

@ -13,7 +13,7 @@ from typing import Any, Generator, Optional
import urllib3 import urllib3
from jinja2 import Template from jinja2 import Template
from kubernetes import client, config, watch from kubernetes import client, config, watch
from kubernetes.client.api_client import ApiClient from kubernetes.client.exceptions import ApiException
from kubernetes.client.models.v1_deployment import V1Deployment from kubernetes.client.models.v1_deployment import V1Deployment
from pydantic import BaseModel from pydantic import BaseModel
@ -36,8 +36,7 @@ def load_kube_config() -> None:
@sync_to_async @sync_to_async
def read_deployment(name: str) -> Optional[V1Deployment]: def read_deployment(name: str) -> Optional[V1Deployment]:
with ApiClient() as api: appsv1 = client.AppsV1Api()
appsv1 = client.AppsV1Api(api)
items = appsv1.list_namespaced_deployment( items = appsv1.list_namespaced_deployment(
namespace=settings.build_namespace, namespace=settings.build_namespace,
label_selector=f"runboat/build={name}", label_selector=f"runboat/build={name}",
@ -46,14 +45,28 @@ def read_deployment(name: str) -> Optional[V1Deployment]:
@sync_to_async @sync_to_async
def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: def delete_deployment(deployment_name: str) -> None:
with ApiClient() as api: appsv1 = client.AppsV1Api()
appsv1 = client.AppsV1Api(api) appsv1.delete_namespaced_deployment(
deployment_name, namespace=settings.build_namespace
)
@sync_to_async
def patch_deployment(
deployment_name: str, ops: list[dict["str", Any]], not_found_ok: bool
) -> None:
appsv1 = client.AppsV1Api()
try:
appsv1.patch_namespaced_deployment( appsv1.patch_namespaced_deployment(
name=deployment_name, name=deployment_name,
namespace=settings.build_namespace, namespace=settings.build_namespace,
body=ops, body=ops,
) )
except ApiException as e:
if e.status == 404 and not_found_ok:
return
raise
def _watch(list_method, *args, **kwargs): def _watch(list_method, *args, **kwargs):

View file

@ -4,6 +4,8 @@ metadata:
name: odoo name: odoo
annotations: annotations:
runboat/init-status: "todo" # ask controller to initialize when there is capacity runboat/init-status: "todo" # ask controller to initialize when there is capacity
finalizers:
- runboat/cleanup
spec: spec:
replicas: 0 # deploy idle replicas: 0 # deploy idle
selector: selector:

View file

@ -10,7 +10,7 @@ bash /runboat/runboat-clone-and-install.sh
oca_wait_for_postgres oca_wait_for_postgres
# Drop database, in case we are reinitializing after failure. # Drop database, in case we are reinitializing.
dropdb --if-exists $PGDATABASE dropdb --if-exists $PGDATABASE
ADDONS=$(addons --addons-dir ${ADDONS_DIR} --include "${INCLUDE}" --exclude "${EXCLUDE}" list) ADDONS=$(addons --addons-dir ${ADDONS_DIR} --include "${INCLUDE}" --exclude "${EXCLUDE}" list)

View file

@ -20,7 +20,7 @@ class BuildStatus(str, Enum):
starting = "starting" # to initialize or initializing or scaling up starting = "starting" # to initialize or initializing or scaling up
started = "started" # running started = "started" # running
failed = "failed" # initialization failed failed = "failed" # initialization failed
cleaning = "cleaning" # cleaning up, will be undeployed soon undeploying = "undeploying" # undeploying, will be deleted after cleanup
class BuildInitStatus(str, Enum): class BuildInitStatus(str, Enum):
@ -28,7 +28,6 @@ class BuildInitStatus(str, Enum):
started = "started" # initialization job running started = "started" # initialization job running
succeeded = "succeeded" # initialization job succeeded succeeded = "succeeded" # initialization job succeeded
failed = "failed" # initialization job failed failed = "failed" # initialization job failed
cleaning = "cleaning" # cleanup job running
class Build(BaseModel): class Build(BaseModel):
@ -92,11 +91,11 @@ class Build(BaseModel):
@classmethod @classmethod
def _status_from_deployment(cls, deployment: V1Deployment) -> BuildStatus: def _status_from_deployment(cls, deployment: V1Deployment) -> BuildStatus:
if deployment.metadata.deletion_timestamp:
return BuildStatus.undeploying
init_status = deployment.metadata.annotations["runboat/init-status"] init_status = deployment.metadata.annotations["runboat/init-status"]
if init_status in (BuildInitStatus.todo, BuildInitStatus.started): if init_status in (BuildInitStatus.todo, BuildInitStatus.started):
return BuildStatus.starting return BuildStatus.starting
elif init_status == BuildInitStatus.cleaning:
return BuildStatus.cleaning
elif init_status == BuildInitStatus.failed: elif init_status == BuildInitStatus.failed:
return BuildStatus.failed return BuildStatus.failed
elif init_status == BuildInitStatus.succeeded: elif init_status == BuildInitStatus.succeeded:
@ -157,6 +156,9 @@ class Build(BaseModel):
"that is already started or starting." "that is already started or starting."
) )
return return
elif self.status == BuildStatus.undeploying:
_logger.info(f"Ignoring start command for {self} that is undeploying.")
return
elif self.status == BuildStatus.failed: elif self.status == BuildStatus.failed:
_logger.info(f"Marking failed {self} for reinitialization.") _logger.info(f"Marking failed {self} for reinitialization.")
await k8s.delete_job(self.name, job_kind=k8s.DeploymentMode.initialize) await k8s.delete_job(self.name, job_kind=k8s.DeploymentMode.initialize)
@ -172,8 +174,16 @@ class Build(BaseModel):
else: else:
_logger.info(f"Ignoring stop command for {self} that is not started.") _logger.info(f"Ignoring stop command for {self} that is not started.")
async def undeploy(self) -> None:
# To undeploy, we delete the deployment. Due to the finalizer, the deletion
# will not be immediate, but the controller will notice the deletionTimestamp
# and launch the cleanup job. When the cleanup job succeeds, the controller
# removes all resources, and also removes the finalizer which allows kubernetes
# to remove the deployment.
await k8s.delete_deployment(self.deployment_name)
async def initialize(self) -> None: async def initialize(self) -> None:
"""Initialize a build.""" """Launch the initialization job."""
# Start initizalization job. on_initialize_{started,succeeded,failed} callbacks # Start initizalization job. on_initialize_{started,succeeded,failed} callbacks
# will follow from job events. # will follow from job events.
_logger.info(f"Deploying initialize job for {self}.") _logger.info(f"Deploying initialize job for {self}.")
@ -189,12 +199,12 @@ class Build(BaseModel):
) )
await k8s.deploy(deployment_vars) await k8s.deploy(deployment_vars)
async def undeploy(self) -> None: async def cleanup(self) -> None:
"""Undeploy a build.""" """Launch the clenaup job."""
# Delete the initialization job to reduce conflict with the cleanup job. # Delete the initialization job to reduce conflict with the cleanup job.
await k8s.delete_job(self.name, job_kind=k8s.DeploymentMode.initialize) await k8s.delete_job(self.name, job_kind=k8s.DeploymentMode.initialize)
# Be sure it is stopped. # Be sure the deployment is stopped.
await self._patch(desired_replicas=0) await self._patch(desired_replicas=0, not_found_ok=True)
# Start cleanup job. on_cleanup_{started,succeeded,failed} callbacks will follow # Start cleanup job. on_cleanup_{started,succeeded,failed} callbacks will follow
# from job events. # from job events.
_logger.info(f"Deploying cleanup job for {self}.") _logger.info(f"Deploying cleanup job for {self}.")
@ -233,24 +243,23 @@ class Build(BaseModel):
await self._patch(init_status=BuildInitStatus.failed, desired_replicas=0) await self._patch(init_status=BuildInitStatus.failed, desired_replicas=0)
async def on_cleanup_started(self) -> None: async def on_cleanup_started(self) -> None:
if self.init_status == BuildInitStatus.cleaning:
return
_logger.info(f"Cleanup job started for {self}.") _logger.info(f"Cleanup job started for {self}.")
await self._patch(init_status=BuildInitStatus.cleaning, desired_replicas=0)
async def on_cleanup_succeeded(self) -> None: async def on_cleanup_succeeded(self) -> None:
_logger.info(f"Cleanup job succeeded for {self}, deleting resources.") _logger.info(f"Cleanup job succeeded for {self}, deleting resources.")
await k8s.delete_resources(self.name) await k8s.delete_resources(self.name)
_logger.debug("Removing finalizer for %s.", self)
await self._patch(remove_finalizers=True, not_found_ok=True)
async def on_cleanup_failed(self) -> None: async def on_cleanup_failed(self) -> None:
_logger.error( _logger.error(f"Cleanup job failed for {self}, manual intervention required.")
f"Cleanup job failed for {self}, " f"manual intervention required."
)
async def _patch( async def _patch(
self, self,
init_status: BuildInitStatus | None = None, init_status: BuildInitStatus | None = None,
desired_replicas: int | None = None, desired_replicas: int | None = None,
remove_finalizers: bool = False,
not_found_ok: bool = False,
) -> None: ) -> None:
ops = [] ops = []
if init_status is not None and init_status != self.init_status: if init_status is not None and init_status != self.init_status:
@ -281,7 +290,14 @@ class Build(BaseModel):
}, },
] ]
) )
await k8s.patch_deployment(self.deployment_name, ops) if remove_finalizers:
ops.append(
{
"op": "remove",
"path": "/metadata/finalizers",
}
)
await k8s.patch_deployment(self.deployment_name, ops, not_found_ok)
class Repo(BaseModel): class Repo(BaseModel):