Use official kubernetes client

Because kubernetes_asyncio watcher have correctness issues,
as they seem to re-emit events that pre-date the initial load.

Although I ended up having to implement an infinite watcher,
to handle socket timeout errors...
This commit is contained in:
Stéphane Bidoul 2021-11-04 09:36:01 +01:00
parent a2ed957819
commit d79cb6223e
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
12 changed files with 171 additions and 53 deletions

View file

@ -23,7 +23,17 @@ For running the controller:
- setup environment variables (start from `.env.sample`) - setup environment variables (start from `.env.sample`)
- create a virtualenv, make sure to have pip>=21.3.1 and `pip install -e .` - create a virtualenv, make sure to have pip>=21.3.1 and `pip install -e .`
- run with `uvicorn runboat.app:app --reload --log-config=log-config-dev.yaml` - run with `uvicorn runboat.app:app --log-config=log-config-dev.yaml`
## Running in production
`gunicorn -w 1 -k runboat.uvicorn.RunboatUvicornWorker runboat.app:app`.
One and only one worker process !
Gunicorn also necessary so SIGINT/SIGTERM shutdowns after a few seconds. Since we use
`run_in_executor`, SIGINT/SIGTERM handling does not work very well in python, and
gunicorn makes it more robust. https://bugs.python.org/issue29309
## Author and contributors ## Author and contributors

View file

@ -15,5 +15,5 @@ root:
level: DEBUG level: DEBUG
handlers: [console] handlers: [console]
loggers: loggers:
kubernetes_asyncio.client.rest: kubernetes.client.rest:
level: INFO level: INFO

View file

@ -11,8 +11,9 @@ classifiers = [
] ]
dependencies = [ dependencies = [
"fastapi", "fastapi",
"gunicorn",
"jinja2", "jinja2",
"kubernetes_asyncio", "kubernetes",
"requests", # TODO for github, to replace by aiohttp or httpx "requests", # TODO for github, to replace by aiohttp or httpx
"rich", "rich",
"uvicorn", "uvicorn",

View file

@ -1,32 +1,35 @@
# frozen requirements generated by pip-deepfreeze # frozen requirements generated by pip-deepfreeze
aiohttp==3.7.4.post0
anyio==3.3.4 anyio==3.3.4
asgiref==3.4.1 asgiref==3.4.1
async-timeout==3.0.1 cachetools==4.2.4
attrs==21.2.0
certifi==2021.10.8 certifi==2021.10.8
chardet==4.0.0
charset-normalizer==2.0.7 charset-normalizer==2.0.7
click==8.0.3 click==8.0.3
colorama==0.4.4 colorama==0.4.4
commonmark==0.9.1 commonmark==0.9.1
fastapi==0.70.0 fastapi==0.70.0
google-auth==2.3.2
gunicorn==20.1.0
h11==0.12.0 h11==0.12.0
idna==3.3 idna==3.3
Jinja2==3.0.2 Jinja2==3.0.2
kubernetes-asyncio==18.20.0 kubernetes==19.15.0
MarkupSafe==2.0.1 MarkupSafe==2.0.1
multidict==5.2.0 oauthlib==3.1.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pydantic==1.8.2 pydantic==1.8.2
Pygments==2.10.0 Pygments==2.10.0
python-dateutil==2.8.2 python-dateutil==2.8.2
PyYAML==6.0 PyYAML==6.0
requests==2.26.0 requests==2.26.0
requests-oauthlib==1.3.0
rich==10.12.0 rich==10.12.0
rsa==4.7.2
six==1.16.0 six==1.16.0
sniffio==1.2.0 sniffio==1.2.0
starlette==0.16.0 starlette==0.16.0
typing-extensions==3.10.0.2 typing-extensions==3.10.0.2
urllib3==1.26.7 urllib3==1.26.7
uvicorn==0.15.0 uvicorn==0.15.0
yarl==1.7.0 websocket-client==1.2.1

View file

@ -18,6 +18,7 @@ class Status(BaseModel):
max_deployed: int max_deployed: int
started: int started: int
max_started: int max_started: int
to_initialize: int
initializing: int initializing: int
max_initializing: int max_initializing: int

View file

@ -49,6 +49,10 @@ class Controller:
def max_started(self) -> int: def max_started(self) -> int:
return settings.max_started return settings.max_started
@property
def to_initialize(self) -> int:
return self.db.count_by_init_status(BuildInitStatus.todo)
@property @property
def initializing(self) -> int: def initializing(self) -> int:
return self.db.count_by_init_status(BuildInitStatus.started) return self.db.count_by_init_status(BuildInitStatus.started)
@ -99,6 +103,9 @@ class Controller:
if build is not None: if build is not None:
return build return build
if not db_only: if not db_only:
_logger.debug(
"Build %s not in local db, fetching from k8s api.", build_name
)
build = await Build.from_name(build_name) build = await Build.from_name(build_name)
if build is not None: if build is not None:
if self.db.add(build): if self.db.add(build):
@ -109,11 +116,19 @@ class Controller:
async def deployment_watcher(self) -> None: async def deployment_watcher(self) -> None:
self.reset() # empty the local db each time we start watching self.reset() # empty the local db each time we start watching
async for event_type, deployment in k8s.watch_deployments(): async for event_type, deployment in k8s.watch_deployments():
_logger.debug(
"%s %s %s dr=%s/rr=%s",
event_type,
deployment.metadata.name,
deployment.metadata.resource_version,
deployment.spec.replicas,
deployment.status.ready_replicas,
)
build_name = deployment.metadata.labels.get("runboat/build") build_name = deployment.metadata.labels.get("runboat/build")
if not build_name: if not build_name:
continue continue
should_wakeup = False should_wakeup = False
if event_type in ("ADDED", "MODIFIED"): if event_type in (None, "ADDED", "MODIFIED"):
should_wakeup = self.db.add(Build.from_deployment(deployment)) should_wakeup = self.db.add(Build.from_deployment(deployment))
elif event_type == "DELETED": elif event_type == "DELETED":
should_wakeup = self.db.remove(build_name) should_wakeup = self.db.remove(build_name)
@ -124,16 +139,22 @@ class Controller:
async def job_watcher(self) -> None: async def job_watcher(self) -> None:
async for event_type, job in k8s.watch_jobs(): async for event_type, job in k8s.watch_jobs():
_logger.debug(
"%s %s %s a=%s/s=%s/f=%s",
event_type,
job.metadata.name,
job.metadata.resource_version,
job.status.active,
job.status.succeeded,
job.status.failed,
)
build_name = job.metadata.labels.get("runboat/build") build_name = job.metadata.labels.get("runboat/build")
if not build_name: if not build_name:
continue continue
job_kind = job.metadata.labels.get("runboat/job-kind") job_kind = job.metadata.labels.get("runboat/job-kind")
if job_kind not in ("initialize", "cleanup"): if job_kind not in ("initialize", "cleanup"):
continue continue
if event_type in ("ADDED", "MODIFIED"): if event_type in (None, "ADDED", "MODIFIED"):
_logger.debug(
"job %s for %s status %s", job_kind, build_name, job.status
)
# Look for build in local db and also in k8s api. # Look for build in local db and also in k8s api.
# This is necessary because job events may come before build events # This is necessary because job events may come before build events
# have arrived. # have arrived.

View file

@ -9,7 +9,6 @@ from .settings import settings
def _github_get(url: str) -> Any: def _github_get(url: str) -> Any:
full_url = f"https://api.github.com{url}" full_url = f"https://api.github.com{url}"
# TODO github token
headers = { headers = {
"Accept": "application/vnd.github.v3+json", "Accept": "application/vnd.github.v3+json",
} }

View file

@ -1,21 +1,26 @@
import asyncio import asyncio
import logging
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
import time
from contextlib import contextmanager from contextlib import contextmanager
from enum import Enum from enum import Enum
from importlib import resources from importlib import resources
from pathlib import Path from pathlib import Path
from typing import Any, AsyncGenerator, Generator, Optional from typing import Any, Generator, Optional
import urllib3
from jinja2 import Template from jinja2 import Template
from kubernetes_asyncio import client, config, watch from kubernetes import client, config, watch
from kubernetes_asyncio.client.api_client import ApiClient from kubernetes.client.api_client import ApiClient
from kubernetes_asyncio.client.models.v1_deployment import V1Deployment from kubernetes.client.models.v1_deployment import V1Deployment
from kubernetes_asyncio.client.models.v1_job import V1Job
from pydantic import BaseModel from pydantic import BaseModel
from .settings import settings from .settings import settings
from .utils import sync_to_async, sync_to_async_iterator
_logger = logging.getLogger(__name__)
def _split_image_name_tag(img: str) -> tuple[str, str]: def _split_image_name_tag(img: str) -> tuple[str, str]:
@ -24,51 +29,80 @@ def _split_image_name_tag(img: str) -> tuple[str, str]:
return (img, "latest") return (img, "latest")
async def load_kube_config() -> None: @sync_to_async
await config.load_kube_config() def load_kube_config() -> None:
config.load_kube_config()
async def read_deployment(name: str) -> Optional[V1Deployment]: @sync_to_async
async with ApiClient() as api: def read_deployment(name: str) -> Optional[V1Deployment]:
with ApiClient() as api:
appsv1 = client.AppsV1Api(api) appsv1 = client.AppsV1Api(api)
ret = await appsv1.list_namespaced_deployment( items = appsv1.list_namespaced_deployment(
namespace=settings.build_namespace, label_selector=f"runboat/build={name}" namespace=settings.build_namespace,
) label_selector=f"runboat/build={name}",
for item in ret.items: ).items
return item # return first return items[0] if items else None
return None # None found
async def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: @sync_to_async
async with ApiClient() as api: def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None:
with ApiClient() as api:
appsv1 = client.AppsV1Api(api) appsv1 = client.AppsV1Api(api)
await appsv1.patch_namespaced_deployment( appsv1.patch_namespaced_deployment(
name=deployment_name, name=deployment_name,
namespace=settings.build_namespace, namespace=settings.build_namespace,
body=ops, body=ops,
) )
async def watch_deployments() -> AsyncGenerator[tuple[str, V1Deployment], None]: def _watch(list_method, *args, **kwargs):
w = watch.Watch() while True:
# use the context manager to close http sessions automatically try:
async with ApiClient() as api: # perform a first query
appsv1 = client.AppsV1Api(api) res = list_method(*args, **kwargs)
async for event in w.stream( resource_version = res.metadata.resource_version
appsv1.list_namespaced_deployment, namespace=settings.build_namespace for item in res.items:
): yield None, item
yield event["type"], event["object"] # stream until timeout
while True:
try:
for event in watch.Watch().stream(
list_method,
*args,
**kwargs,
resource_version=resource_version,
_request_timeout=60,
):
if event["type"] == "ERROR":
raise RuntimeError("Kubernetes watch error")
resource_version = event["object"].metadata.resource_version
yield event["type"], event["object"]
except urllib3.exceptions.TimeoutError:
continue
except TimeoutError:
continue
except Exception as e:
delay = 5
_logger.info(
f"Error {e} watching {list_method.__name__}. Retrying in {delay} sec."
)
time.sleep(delay)
continue
async def watch_jobs() -> AsyncGenerator[tuple[str, V1Job], None]: @sync_to_async_iterator
w = watch.Watch() def watch_deployments():
# use the context manager to close http sessions automatically appsv1 = client.AppsV1Api()
async with ApiClient() as api: yield from _watch(
appsv1 = client.BatchV1Api(api) appsv1.list_namespaced_deployment, namespace=settings.build_namespace
async for event in w.stream( )
appsv1.list_namespaced_job, namespace=settings.build_namespace
):
yield event["type"], event["object"] @sync_to_async_iterator
def watch_jobs():
batchv1 = client.BatchV1Api()
yield from _watch(batchv1.list_namespaced_job, namespace=settings.build_namespace)
class DeploymentMode(str, Enum): class DeploymentMode(str, Enum):

View file

@ -4,7 +4,7 @@ import uuid
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
from kubernetes_asyncio.client.models.v1_deployment import V1Deployment from kubernetes.client.models.v1_deployment import V1Deployment
from pydantic import BaseModel from pydantic import BaseModel
from . import k8s from . import k8s

View file

@ -18,6 +18,7 @@ class Settings(BaseSettings):
build_admin_passwd: str build_admin_passwd: str
build_domain: str build_domain: str
github_token: Optional[str] github_token: Optional[str]
log_config: Optional[str]
class Config: class Config:
env_prefix = "RUNBOAT_" env_prefix = "RUNBOAT_"

View file

@ -1,5 +1,44 @@
import asyncio
import re import re
from concurrent.futures.thread import ThreadPoolExecutor
from functools import wraps
_pool = ThreadPoolExecutor(max_workers=20, thread_name_prefix="sync_to_async")
def slugify(s: str | int) -> str: def slugify(s: str | int) -> str:
return re.sub(r"[^a-z0-9]", "-", str(s).lower()) return re.sub(r"[^a-z0-9]", "-", str(s).lower())
def sync_to_async(func):
@wraps(func)
async def inner(*args):
return await asyncio.get_running_loop().run_in_executor(_pool, func, *args)
return inner
def sync_to_async_iterator(iterator_func):
@sync_to_async
def async_next(iterator):
try:
return next(iterator)
except StopIteration:
raise StopAsyncIteration()
@sync_to_async
def async_iterator_func(*args):
return iterator_func(*args)
@wraps(iterator_func)
async def inner(*args):
iterator = await async_iterator_func(*args)
while True:
try:
item = await async_next(iterator)
except StopAsyncIteration:
return
else:
yield item
return inner

9
src/runboat/uvicorn.py Normal file
View file

@ -0,0 +1,9 @@
from uvicorn.workers import UvicornWorker
from .settings import settings
class RunboatUvicornWorker(UvicornWorker):
CONFIG_KWARGS = {"loop": "asyncio"}
if settings.log_config:
CONFIG_KWARGS["log_config"] = settings.log_config