Better error handling in k8s watches

An automatic retry in the watch itself could lead to
missed events,
leading to removed builds remaining in the database.
So we raise the error so the controller
can reset the database and do a full refresh.
This commit is contained in:
Stéphane Bidoul 2021-11-30 18:44:45 +01:00
parent 825d8d2e4a
commit bd3f317635
No known key found for this signature in database
GPG key ID: BCAB2555446B5B92
2 changed files with 15 additions and 10 deletions

View file

@ -15,6 +15,8 @@ _logger = logging.getLogger(__name__)
# of the background tasks and the clearing of the wakeup avoids waking up the tasks # of the background tasks and the clearing of the wakeup avoids waking up the tasks
# too often. # too often.
EVENT_BUFFERING_DELAY = 1 EVENT_BUFFERING_DELAY = 1
# When an exception happens in background tasks, restart them after a delay.
WALKING_DEAD_RESTART_DELAY = 5
class Controller: class Controller:
@ -256,13 +258,18 @@ class Controller:
_logger.info(f"(Re)starting {func.__name__}") _logger.info(f"(Re)starting {func.__name__}")
try: try:
await func() await func()
except k8s.WatchException as e:
_logger.info(
f"Watch error {e} in {func.__name__}, "
f"restarting in {WALKING_DEAD_RESTART_DELAY} sec."
)
await asyncio.sleep(WALKING_DEAD_RESTART_DELAY)
except Exception: except Exception:
delay = 5
_logger.exception( _logger.exception(
f"Unhandled exception in {func.__name__}, " f"Unhandled exception in {func.__name__}, "
f"restarting in {delay} sec." f"restarting in {WALKING_DEAD_RESTART_DELAY} sec."
) )
await asyncio.sleep(delay) await asyncio.sleep(WALKING_DEAD_RESTART_DELAY)
for f in ( for f in (
self.deployment_watcher, self.deployment_watcher,

View file

@ -4,7 +4,6 @@ import os
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
import time
from contextlib import contextmanager from contextlib import contextmanager
from enum import Enum from enum import Enum
from importlib import resources from importlib import resources
@ -80,6 +79,10 @@ def patch_deployment(
raise raise
class WatchException(Exception):
pass
def _watch( def _watch(
list_method: Callable[..., Any], *args: Any, **kwargs: Any list_method: Callable[..., Any], *args: Any, **kwargs: Any
) -> Generator[tuple[str | None, Any], None, None]: ) -> Generator[tuple[str | None, Any], None, None]:
@ -113,12 +116,7 @@ def _watch(
except (urllib3.exceptions.TimeoutError, TimeoutError): except (urllib3.exceptions.TimeoutError, TimeoutError):
continue continue
except Exception as e: except Exception as e:
delay = 5 raise WatchException(f"{e} in {list_method.__name__}") from e
_logger.info(
f"Error {e} watching {list_method.__name__}. Retrying in {delay} sec."
)
time.sleep(delay)
continue
@sync_to_async_iterator @sync_to_async_iterator