diff --git a/src/runboat/api.py b/src/runboat/api.py index 7bced44..bfb797c 100644 --- a/src/runboat/api.py +++ b/src/runboat/api.py @@ -1,5 +1,5 @@ import datetime -from typing import List, Optional +from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import StreamingResponse @@ -69,14 +69,14 @@ async def controller_status(): return controller -@router.get("/repos", response_model=List[Repo]) +@router.get("/repos", response_model=list[Repo]) async def repos(): return [models.Repo(name=name) for name in settings.supported_repos] @router.get( "/repos/{org}/{repo}/branches-and-pulls", - response_model=List[BranchOrPull], + response_model=list[BranchOrPull], response_model_exclude_none=True, ) async def branches_and_pulls(org: str, repo: str): diff --git a/src/runboat/k8s.py b/src/runboat/k8s.py index 7bf6f62..526e739 100644 --- a/src/runboat/k8s.py +++ b/src/runboat/k8s.py @@ -5,7 +5,7 @@ import tempfile from contextlib import contextmanager from importlib import resources from pathlib import Path -from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Tuple +from typing import Any, AsyncGenerator, Generator, Optional from jinja2 import Template from kubernetes_asyncio import client, config, watch @@ -16,7 +16,7 @@ from pydantic import BaseModel from .settings import settings -def _split_image_name_tag(img: str) -> Tuple[str, str]: +def _split_image_name_tag(img: str) -> tuple[str, str]: if ":" in img: return img.split(":", 2) return (img, "latest") @@ -26,7 +26,7 @@ async def load_kube_config() -> None: await config.load_kube_config() -async def patch_deployment(deployment_name: str, ops: List[Dict["str", Any]]) -> None: +async def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None: async with ApiClient() as api: appsv1 = client.AppsV1Api(api) await appsv1.patch_namespaced_deployment( @@ -36,7 +36,7 @@ async def patch_deployment(deployment_name: str, ops: List[Dict["str", Any]]) -> ) -async def watch_deployments() -> AsyncGenerator[Tuple[str, V1Deployment], None]: +async def watch_deployments() -> AsyncGenerator[tuple[str, V1Deployment], None]: w = watch.Watch() # use the context manager to close http sessions automatically async with ApiClient() as api: @@ -109,7 +109,7 @@ def _render_kubefiles(deployment_vars: DeploymentVars) -> Generator[Path, None, yield tmp_path -async def _kubectl(args: List[str]) -> None: +async def _kubectl(args: list[str]) -> None: proc = await asyncio.create_subprocess_exec("kubectl", *args) return_code = await proc.wait() if return_code != 0: