Moar typing

This commit is contained in:
Stéphane Bidoul 2021-11-13 18:21:29 +01:00
parent 858c58e47d
commit 8e921753ba
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
2 changed files with 27 additions and 8 deletions

View file

@ -156,10 +156,10 @@ class Build(BaseModel):
def live_link(self) -> str: def live_link(self) -> str:
return f"{self.webui_link}?live" return f"{self.webui_link}?live"
async def init_log(self) -> str: async def init_log(self) -> str | None:
return await k8s.log(self.name, job_kind=k8s.DeploymentMode.initialize) return await k8s.log(self.name, job_kind=k8s.DeploymentMode.initialize)
async def log(self) -> str: async def log(self) -> str | None:
return await k8s.log(self.name, job_kind=None) return await k8s.log(self.name, job_kind=None)
@classmethod @classmethod

View file

@ -3,6 +3,16 @@ import functools
import re import re
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from functools import wraps from functools import wraps
from typing import (
Any,
AsyncGenerator,
Awaitable,
Callable,
Generator,
Iterator,
ParamSpec,
TypeVar,
)
_pool = ThreadPoolExecutor(max_workers=20, thread_name_prefix="sync_to_async") _pool = ThreadPoolExecutor(max_workers=20, thread_name_prefix="sync_to_async")
@ -11,29 +21,38 @@ def slugify(s: str | int) -> str:
return re.sub(r"[^a-z0-9]", "-", str(s).lower()) return re.sub(r"[^a-z0-9]", "-", str(s).lower())
def sync_to_async(func): # TODO replace ... with P below when mypy supports PEP 612
# (https://github.com/python/mypy/issues/8645)
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
def sync_to_async(func: Callable[..., R]) -> Callable[..., Awaitable[R]]:
@wraps(func) @wraps(func)
async def inner(*args, **kwargs): async def inner(*args: Any, **kwargs: Any) -> R:
f = functools.partial(func, *args, **kwargs) f = functools.partial(func, *args, **kwargs)
return await asyncio.get_running_loop().run_in_executor(_pool, f) return await asyncio.get_running_loop().run_in_executor(_pool, f)
return inner return inner
def sync_to_async_iterator(iterator_func): def sync_to_async_iterator(
iterator_func: Callable[..., Generator[R, None, None]]
) -> Callable[..., AsyncGenerator[R, None]]:
@sync_to_async @sync_to_async
def async_next(iterator): def async_next(iterator: Iterator[R]) -> R:
try: try:
return next(iterator) return next(iterator)
except StopIteration: except StopIteration:
raise StopAsyncIteration() raise StopAsyncIteration()
@sync_to_async @sync_to_async
def async_iterator_func(*args, **kwargs): def async_iterator_func(*args: Any, **kwargs: Any) -> Generator[R, None, None]:
return iterator_func(*args, **kwargs) return iterator_func(*args, **kwargs)
@wraps(iterator_func) @wraps(iterator_func)
async def inner(*args, **kwargs): async def inner(*args: Any, **kwargs: Any) -> AsyncGenerator[R, None]:
iterator = await async_iterator_func(*args, **kwargs) iterator = await async_iterator_func(*args, **kwargs)
while True: while True:
try: try: