diff --git a/src/runboat/api.py b/src/runboat/api.py index ffb8c99..47e84de 100644 --- a/src/runboat/api.py +++ b/src/runboat/api.py @@ -21,6 +21,7 @@ class Status(BaseModel): to_initialize: int initializing: int max_initializing: int + undeploying: int class Config: orm_mode = True diff --git a/src/runboat/controller.py b/src/runboat/controller.py index 11bb74e..3b01979 100644 --- a/src/runboat/controller.py +++ b/src/runboat/controller.py @@ -69,6 +69,10 @@ class Controller: def max_deployed(self) -> int: return settings.max_deployed + @property + def undeploying(self) -> int: + return self.db.count_by_status(BuildStatus.undeploying) + async def deploy_or_delay_start( self, repo: str, target_branch: str, pr: int | None, git_commit: str ) -> None: @@ -117,7 +121,7 @@ class Controller: self.reset() # empty the local db each time we start watching async for event_type, deployment in k8s.watch_deployments(): _logger.debug( - "%s %s %s dr=%s/rr=%s", + "Event %s %s %s dr=%s/rr=%s", event_type, deployment.metadata.name, deployment.metadata.resource_version, @@ -129,7 +133,14 @@ class Controller: continue should_wakeup = False if event_type in (None, "ADDED", "MODIFIED"): - should_wakeup = self.db.add(Build.from_deployment(deployment)) + prev_build = self.db.get(build_name) + build = Build.from_deployment(deployment) + should_wakeup = self.db.add(build) + if build.status == BuildStatus.undeploying and ( + prev_build is None or prev_build.status != BuildStatus.undeploying + ): + _logger.info(f"{build} has deletionTimestamp. Undeploying.") + await build.cleanup() elif event_type == "DELETED": should_wakeup = self.db.remove(build_name) else: @@ -140,7 +151,7 @@ class Controller: async def job_watcher(self) -> None: async for event_type, job in k8s.watch_jobs(): _logger.debug( - "%s %s %s a=%s/s=%s/f=%s", + "Event %s %s %s a=%s/s=%s/f=%s", event_type, job.metadata.name, job.metadata.resource_version, diff --git a/src/runboat/k8s.py b/src/runboat/k8s.py index 79fa151..579c927 100644 --- a/src/runboat/k8s.py +++ b/src/runboat/k8s.py @@ -13,7 +13,7 @@ from typing import Any, Generator, Optional import urllib3 from jinja2 import Template from kubernetes import client, config, watch -from kubernetes.client.api_client import ApiClient +from kubernetes.client.exceptions import ApiException from kubernetes.client.models.v1_deployment import V1Deployment from pydantic import BaseModel @@ -36,24 +36,37 @@ def load_kube_config() -> None: @sync_to_async def read_deployment(name: str) -> Optional[V1Deployment]: - with ApiClient() as api: - appsv1 = client.AppsV1Api(api) - items = appsv1.list_namespaced_deployment( - namespace=settings.build_namespace, - label_selector=f"runboat/build={name}", - ).items - return items[0] if items else None + appsv1 = client.AppsV1Api() + items = appsv1.list_namespaced_deployment( + namespace=settings.build_namespace, + label_selector=f"runboat/build={name}", + ).items + return items[0] if items else None @sync_to_async -def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: - with ApiClient() as api: - appsv1 = client.AppsV1Api(api) +def delete_deployment(deployment_name: str) -> None: + appsv1 = client.AppsV1Api() + appsv1.delete_namespaced_deployment( + deployment_name, namespace=settings.build_namespace + ) + + +@sync_to_async +def patch_deployment( + deployment_name: str, ops: list[dict["str", Any]], not_found_ok: bool +) -> None: + appsv1 = client.AppsV1Api() + try: appsv1.patch_namespaced_deployment( name=deployment_name, namespace=settings.build_namespace, body=ops, ) + except ApiException as e: + if e.status == 404 and not_found_ok: + return + raise def _watch(list_method, *args, **kwargs): diff --git a/src/runboat/kubefiles/deployment.yaml b/src/runboat/kubefiles/deployment.yaml index 917b7be..e46afc6 100644 --- a/src/runboat/kubefiles/deployment.yaml +++ b/src/runboat/kubefiles/deployment.yaml @@ -4,6 +4,8 @@ metadata: name: odoo annotations: runboat/init-status: "todo" # ask controller to initialize when there is capacity + finalizers: + - runboat/cleanup spec: replicas: 0 # deploy idle selector: diff --git a/src/runboat/kubefiles/runboat-initialize.sh b/src/runboat/kubefiles/runboat-initialize.sh index c1e6cc0..1e056cf 100755 --- a/src/runboat/kubefiles/runboat-initialize.sh +++ b/src/runboat/kubefiles/runboat-initialize.sh @@ -10,7 +10,7 @@ bash /runboat/runboat-clone-and-install.sh oca_wait_for_postgres -# Drop database, in case we are reinitializing after failure. +# Drop database, in case we are reinitializing. dropdb --if-exists $PGDATABASE ADDONS=$(addons --addons-dir ${ADDONS_DIR} --include "${INCLUDE}" --exclude "${EXCLUDE}" list) diff --git a/src/runboat/models.py b/src/runboat/models.py index 2023e03..6b1b572 100644 --- a/src/runboat/models.py +++ b/src/runboat/models.py @@ -20,7 +20,7 @@ class BuildStatus(str, Enum): starting = "starting" # to initialize or initializing or scaling up started = "started" # running failed = "failed" # initialization failed - cleaning = "cleaning" # cleaning up, will be undeployed soon + undeploying = "undeploying" # undeploying, will be deleted after cleanup class BuildInitStatus(str, Enum): @@ -28,7 +28,6 @@ class BuildInitStatus(str, Enum): started = "started" # initialization job running succeeded = "succeeded" # initialization job succeeded failed = "failed" # initialization job failed - cleaning = "cleaning" # cleanup job running class Build(BaseModel): @@ -92,11 +91,11 @@ class Build(BaseModel): @classmethod def _status_from_deployment(cls, deployment: V1Deployment) -> BuildStatus: + if deployment.metadata.deletion_timestamp: + return BuildStatus.undeploying init_status = deployment.metadata.annotations["runboat/init-status"] if init_status in (BuildInitStatus.todo, BuildInitStatus.started): return BuildStatus.starting - elif init_status == BuildInitStatus.cleaning: - return BuildStatus.cleaning elif init_status == BuildInitStatus.failed: return BuildStatus.failed elif init_status == BuildInitStatus.succeeded: @@ -157,6 +156,9 @@ class Build(BaseModel): "that is already started or starting." ) return + elif self.status == BuildStatus.undeploying: + _logger.info(f"Ignoring start command for {self} that is undeploying.") + return elif self.status == BuildStatus.failed: _logger.info(f"Marking failed {self} for reinitialization.") await k8s.delete_job(self.name, job_kind=k8s.DeploymentMode.initialize) @@ -172,8 +174,16 @@ class Build(BaseModel): else: _logger.info(f"Ignoring stop command for {self} that is not started.") + async def undeploy(self) -> None: + # To undeploy, we delete the deployment. Due to the finalizer, the deletion + # will not be immediate, but the controller will notice the deletionTimestamp + # and launch the cleanup job. When the cleanup job succeeds, the controller + # removes all resources, and also removes the finalizer which allows kubernetes + # to remove the deployment. + await k8s.delete_deployment(self.deployment_name) + async def initialize(self) -> None: - """Initialize a build.""" + """Launch the initialization job.""" # Start initizalization job. on_initialize_{started,succeeded,failed} callbacks # will follow from job events. _logger.info(f"Deploying initialize job for {self}.") @@ -189,12 +199,12 @@ class Build(BaseModel): ) await k8s.deploy(deployment_vars) - async def undeploy(self) -> None: - """Undeploy a build.""" + async def cleanup(self) -> None: + """Launch the clenaup job.""" # Delete the initialization job to reduce conflict with the cleanup job. await k8s.delete_job(self.name, job_kind=k8s.DeploymentMode.initialize) - # Be sure it is stopped. - await self._patch(desired_replicas=0) + # Be sure the deployment is stopped. + await self._patch(desired_replicas=0, not_found_ok=True) # Start cleanup job. on_cleanup_{started,succeeded,failed} callbacks will follow # from job events. _logger.info(f"Deploying cleanup job for {self}.") @@ -233,24 +243,23 @@ class Build(BaseModel): await self._patch(init_status=BuildInitStatus.failed, desired_replicas=0) async def on_cleanup_started(self) -> None: - if self.init_status == BuildInitStatus.cleaning: - return _logger.info(f"Cleanup job started for {self}.") - await self._patch(init_status=BuildInitStatus.cleaning, desired_replicas=0) async def on_cleanup_succeeded(self) -> None: _logger.info(f"Cleanup job succeeded for {self}, deleting resources.") await k8s.delete_resources(self.name) + _logger.debug("Removing finalizer for %s.", self) + await self._patch(remove_finalizers=True, not_found_ok=True) async def on_cleanup_failed(self) -> None: - _logger.error( - f"Cleanup job failed for {self}, " f"manual intervention required." - ) + _logger.error(f"Cleanup job failed for {self}, manual intervention required.") async def _patch( self, init_status: BuildInitStatus | None = None, desired_replicas: int | None = None, + remove_finalizers: bool = False, + not_found_ok: bool = False, ) -> None: ops = [] if init_status is not None and init_status != self.init_status: @@ -281,7 +290,14 @@ class Build(BaseModel): }, ] ) - await k8s.patch_deployment(self.deployment_name, ops) + if remove_finalizers: + ops.append( + { + "op": "remove", + "path": "/metadata/finalizers", + } + ) + await k8s.patch_deployment(self.deployment_name, ops, not_found_ok) class Repo(BaseModel):