Modernize a few type annotations

This commit is contained in:
Stéphane Bidoul 2021-10-28 19:09:27 +02:00
parent c719e90e6c
commit 6f90869ef7
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
2 changed files with 8 additions and 8 deletions

View file

@ -1,5 +1,5 @@
import datetime
from typing import List, Optional
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
@ -69,14 +69,14 @@ async def controller_status():
return controller
@router.get("/repos", response_model=List[Repo])
@router.get("/repos", response_model=list[Repo])
async def repos():
return [models.Repo(name=name) for name in settings.supported_repos]
@router.get(
"/repos/{org}/{repo}/branches-and-pulls",
response_model=List[BranchOrPull],
response_model=list[BranchOrPull],
response_model_exclude_none=True,
)
async def branches_and_pulls(org: str, repo: str):

View file

@ -5,7 +5,7 @@ import tempfile
from contextlib import contextmanager
from importlib import resources
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Tuple
from typing import Any, AsyncGenerator, Generator, Optional
from jinja2 import Template
from kubernetes_asyncio import client, config, watch
@ -16,7 +16,7 @@ from pydantic import BaseModel
from .settings import settings
def _split_image_name_tag(img: str) -> Tuple[str, str]:
def _split_image_name_tag(img: str) -> tuple[str, str]:
if ":" in img:
return img.split(":", 2)
return (img, "latest")
@ -26,7 +26,7 @@ async def load_kube_config() -> None:
await config.load_kube_config()
async def patch_deployment(deployment_name: str, ops: List[Dict["str", Any]]) -> None:
async def patch_deployment(deployment_name: str, ops: list[dict["str", Any]]) -> None:
async with ApiClient() as api:
appsv1 = client.AppsV1Api(api)
await appsv1.patch_namespaced_deployment(
@ -36,7 +36,7 @@ async def patch_deployment(deployment_name: str, ops: List[Dict["str", Any]]) ->
)
async def watch_deployments() -> AsyncGenerator[Tuple[str, V1Deployment], None]:
async def watch_deployments() -> AsyncGenerator[tuple[str, V1Deployment], None]:
w = watch.Watch()
# use the context manager to close http sessions automatically
async with ApiClient() as api:
@ -109,7 +109,7 @@ def _render_kubefiles(deployment_vars: DeploymentVars) -> Generator[Path, None,
yield tmp_path
async def _kubectl(args: List[str]) -> None:
async def _kubectl(args: list[str]) -> None:
proc = await asyncio.create_subprocess_exec("kubectl", *args)
return_code = await proc.wait()
if return_code != 0: