diff --git a/README.md b/README.md index 34c65a7..bfe9647 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,17 @@ For running the controller: - setup environment variables (start from `.env.sample`) - create a virtualenv, make sure to have pip>=21.3.1 and `pip install -e .` -- run with `uvicorn runboat.app:app --reload --log-config=log-config-dev.yaml` +- run with `uvicorn runboat.app:app --log-config=log-config-dev.yaml` + +## Running in production + +`gunicorn -w 1 -k runboat.uvicorn.RunboatUvicornWorker runboat.app:app`. + +One and only one worker process ! + +Gunicorn also necessary so SIGINT/SIGTERM shutdowns after a few seconds. Since we use +`run_in_executor`, SIGINT/SIGTERM handling does not work very well in python, and +gunicorn makes it more robust. https://bugs.python.org/issue29309 ## Author and contributors diff --git a/log-config-dev.yaml b/log-config-dev.yaml index 6d73b13..2ee4e3e 100644 --- a/log-config-dev.yaml +++ b/log-config-dev.yaml @@ -15,5 +15,5 @@ root: level: DEBUG handlers: [console] loggers: - kubernetes_asyncio.client.rest: + kubernetes.client.rest: level: INFO diff --git a/pyproject.toml b/pyproject.toml index 9f69674..358fb35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,8 +11,9 @@ classifiers = [ ] dependencies = [ "fastapi", + "gunicorn", "jinja2", - "kubernetes_asyncio", + "kubernetes", "requests", # TODO for github, to replace by aiohttp or httpx "rich", "uvicorn", diff --git a/requirements.txt b/requirements.txt index 16ad421..7b42461 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,32 +1,35 @@ # frozen requirements generated by pip-deepfreeze -aiohttp==3.7.4.post0 anyio==3.3.4 asgiref==3.4.1 -async-timeout==3.0.1 -attrs==21.2.0 +cachetools==4.2.4 certifi==2021.10.8 -chardet==4.0.0 charset-normalizer==2.0.7 click==8.0.3 colorama==0.4.4 commonmark==0.9.1 fastapi==0.70.0 +google-auth==2.3.2 +gunicorn==20.1.0 h11==0.12.0 idna==3.3 Jinja2==3.0.2 -kubernetes-asyncio==18.20.0 +kubernetes==19.15.0 MarkupSafe==2.0.1 -multidict==5.2.0 +oauthlib==3.1.1 +pyasn1==0.4.8 +pyasn1-modules==0.2.8 pydantic==1.8.2 Pygments==2.10.0 python-dateutil==2.8.2 PyYAML==6.0 requests==2.26.0 +requests-oauthlib==1.3.0 rich==10.12.0 +rsa==4.7.2 six==1.16.0 sniffio==1.2.0 starlette==0.16.0 typing-extensions==3.10.0.2 urllib3==1.26.7 uvicorn==0.15.0 -yarl==1.7.0 +websocket-client==1.2.1 diff --git a/src/runboat/api.py b/src/runboat/api.py index b5c0ee8..ffb8c99 100644 --- a/src/runboat/api.py +++ b/src/runboat/api.py @@ -18,6 +18,7 @@ class Status(BaseModel): max_deployed: int started: int max_started: int + to_initialize: int initializing: int max_initializing: int diff --git a/src/runboat/controller.py b/src/runboat/controller.py index 77e948d..11bb74e 100644 --- a/src/runboat/controller.py +++ b/src/runboat/controller.py @@ -49,6 +49,10 @@ class Controller: def max_started(self) -> int: return settings.max_started + @property + def to_initialize(self) -> int: + return self.db.count_by_init_status(BuildInitStatus.todo) + @property def initializing(self) -> int: return self.db.count_by_init_status(BuildInitStatus.started) @@ -99,6 +103,9 @@ class Controller: if build is not None: return build if not db_only: + _logger.debug( + "Build %s not in local db, fetching from k8s api.", build_name + ) build = await Build.from_name(build_name) if build is not None: if self.db.add(build): @@ -109,11 +116,19 @@ class Controller: async def deployment_watcher(self) -> None: self.reset() # empty the local db each time we start watching async for event_type, deployment in k8s.watch_deployments(): + _logger.debug( + "%s %s %s dr=%s/rr=%s", + event_type, + deployment.metadata.name, + deployment.metadata.resource_version, + deployment.spec.replicas, + deployment.status.ready_replicas, + ) build_name = deployment.metadata.labels.get("runboat/build") if not build_name: continue should_wakeup = False - if event_type in ("ADDED", "MODIFIED"): + if event_type in (None, "ADDED", "MODIFIED"): should_wakeup = self.db.add(Build.from_deployment(deployment)) elif event_type == "DELETED": should_wakeup = self.db.remove(build_name) @@ -124,16 +139,22 @@ class Controller: async def job_watcher(self) -> None: async for event_type, job in k8s.watch_jobs(): + _logger.debug( + "%s %s %s a=%s/s=%s/f=%s", + event_type, + job.metadata.name, + job.metadata.resource_version, + job.status.active, + job.status.succeeded, + job.status.failed, + ) build_name = job.metadata.labels.get("runboat/build") if not build_name: continue job_kind = job.metadata.labels.get("runboat/job-kind") if job_kind not in ("initialize", "cleanup"): continue - if event_type in ("ADDED", "MODIFIED"): - _logger.debug( - "job %s for %s status %s", job_kind, build_name, job.status - ) + if event_type in (None, "ADDED", "MODIFIED"): # Look for build in local db and also in k8s api. # This is necessary because job events may come before build events # have arrived. diff --git a/src/runboat/github.py b/src/runboat/github.py index ff3b428..6f1bb13 100644 --- a/src/runboat/github.py +++ b/src/runboat/github.py @@ -9,7 +9,6 @@ from .settings import settings def _github_get(url: str) -> Any: full_url = f"https://api.github.com{url}" - # TODO github token headers = { "Accept": "application/vnd.github.v3+json", } diff --git a/src/runboat/k8s.py b/src/runboat/k8s.py index 24f4e77..79fa151 100644 --- a/src/runboat/k8s.py +++ b/src/runboat/k8s.py @@ -1,21 +1,26 @@ import asyncio +import logging import shutil import subprocess import tempfile +import time from contextlib import contextmanager from enum import Enum from importlib import resources from pathlib import Path -from typing import Any, AsyncGenerator, Generator, Optional +from typing import Any, Generator, Optional +import urllib3 from jinja2 import Template -from kubernetes_asyncio import client, config, watch -from kubernetes_asyncio.client.api_client import ApiClient -from kubernetes_asyncio.client.models.v1_deployment import V1Deployment -from kubernetes_asyncio.client.models.v1_job import V1Job +from kubernetes import client, config, watch +from kubernetes.client.api_client import ApiClient +from kubernetes.client.models.v1_deployment import V1Deployment from pydantic import BaseModel from .settings import settings +from .utils import sync_to_async, sync_to_async_iterator + +_logger = logging.getLogger(__name__) def _split_image_name_tag(img: str) -> tuple[str, str]: @@ -24,51 +29,80 @@ def _split_image_name_tag(img: str) -> tuple[str, str]: return (img, "latest") -async def load_kube_config() -> None: - await config.load_kube_config() +@sync_to_async +def load_kube_config() -> None: + config.load_kube_config() -async def read_deployment(name: str) -> Optional[V1Deployment]: - async with ApiClient() as api: +@sync_to_async +def read_deployment(name: str) -> Optional[V1Deployment]: + with ApiClient() as api: appsv1 = client.AppsV1Api(api) - ret = await appsv1.list_namespaced_deployment( - namespace=settings.build_namespace, label_selector=f"runboat/build={name}" - ) - for item in ret.items: - return item # return first - return None # None found + items = appsv1.list_namespaced_deployment( + namespace=settings.build_namespace, + label_selector=f"runboat/build={name}", + ).items + return items[0] if items else None -async def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: - async with ApiClient() as api: +@sync_to_async +def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: + with ApiClient() as api: appsv1 = client.AppsV1Api(api) - await appsv1.patch_namespaced_deployment( + appsv1.patch_namespaced_deployment( name=deployment_name, namespace=settings.build_namespace, body=ops, ) -async def watch_deployments() -> AsyncGenerator[tuple[str, V1Deployment], None]: - w = watch.Watch() - # use the context manager to close http sessions automatically - async with ApiClient() as api: - appsv1 = client.AppsV1Api(api) - async for event in w.stream( - appsv1.list_namespaced_deployment, namespace=settings.build_namespace - ): - yield event["type"], event["object"] +def _watch(list_method, *args, **kwargs): + while True: + try: + # perform a first query + res = list_method(*args, **kwargs) + resource_version = res.metadata.resource_version + for item in res.items: + yield None, item + # stream until timeout + while True: + try: + for event in watch.Watch().stream( + list_method, + *args, + **kwargs, + resource_version=resource_version, + _request_timeout=60, + ): + if event["type"] == "ERROR": + raise RuntimeError("Kubernetes watch error") + resource_version = event["object"].metadata.resource_version + yield event["type"], event["object"] + except urllib3.exceptions.TimeoutError: + continue + except TimeoutError: + continue + except Exception as e: + delay = 5 + _logger.info( + f"Error {e} watching {list_method.__name__}. Retrying in {delay} sec." + ) + time.sleep(delay) + continue -async def watch_jobs() -> AsyncGenerator[tuple[str, V1Job], None]: - w = watch.Watch() - # use the context manager to close http sessions automatically - async with ApiClient() as api: - appsv1 = client.BatchV1Api(api) - async for event in w.stream( - appsv1.list_namespaced_job, namespace=settings.build_namespace - ): - yield event["type"], event["object"] +@sync_to_async_iterator +def watch_deployments(): + appsv1 = client.AppsV1Api() + yield from _watch( + appsv1.list_namespaced_deployment, namespace=settings.build_namespace + ) + + +@sync_to_async_iterator +def watch_jobs(): + batchv1 = client.BatchV1Api() + yield from _watch(batchv1.list_namespaced_job, namespace=settings.build_namespace) class DeploymentMode(str, Enum): diff --git a/src/runboat/models.py b/src/runboat/models.py index 6af817e..2023e03 100644 --- a/src/runboat/models.py +++ b/src/runboat/models.py @@ -4,7 +4,7 @@ import uuid from enum import Enum from typing import Optional -from kubernetes_asyncio.client.models.v1_deployment import V1Deployment +from kubernetes.client.models.v1_deployment import V1Deployment from pydantic import BaseModel from . import k8s diff --git a/src/runboat/settings.py b/src/runboat/settings.py index 42c34d0..997d0a8 100644 --- a/src/runboat/settings.py +++ b/src/runboat/settings.py @@ -18,6 +18,7 @@ class Settings(BaseSettings): build_admin_passwd: str build_domain: str github_token: Optional[str] + log_config: Optional[str] class Config: env_prefix = "RUNBOAT_" diff --git a/src/runboat/utils.py b/src/runboat/utils.py index d2b7366..2d17fcb 100644 --- a/src/runboat/utils.py +++ b/src/runboat/utils.py @@ -1,5 +1,44 @@ +import asyncio import re +from concurrent.futures.thread import ThreadPoolExecutor +from functools import wraps + +_pool = ThreadPoolExecutor(max_workers=20, thread_name_prefix="sync_to_async") def slugify(s: str | int) -> str: return re.sub(r"[^a-z0-9]", "-", str(s).lower()) + + +def sync_to_async(func): + @wraps(func) + async def inner(*args): + return await asyncio.get_running_loop().run_in_executor(_pool, func, *args) + + return inner + + +def sync_to_async_iterator(iterator_func): + @sync_to_async + def async_next(iterator): + try: + return next(iterator) + except StopIteration: + raise StopAsyncIteration() + + @sync_to_async + def async_iterator_func(*args): + return iterator_func(*args) + + @wraps(iterator_func) + async def inner(*args): + iterator = await async_iterator_func(*args) + while True: + try: + item = await async_next(iterator) + except StopAsyncIteration: + return + else: + yield item + + return inner diff --git a/src/runboat/uvicorn.py b/src/runboat/uvicorn.py new file mode 100644 index 0000000..d3aecb1 --- /dev/null +++ b/src/runboat/uvicorn.py @@ -0,0 +1,9 @@ +from uvicorn.workers import UvicornWorker + +from .settings import settings + + +class RunboatUvicornWorker(UvicornWorker): + CONFIG_KWARGS = {"loop": "asyncio"} + if settings.log_config: + CONFIG_KWARGS["log_config"] = settings.log_config