# Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Common helper utilities
"""
# mypy: disallow-any-explicit=False
from __future__ import annotations
from collections.abc import AsyncIterator
from collections.abc import Callable
from contextlib import asynccontextmanager
from typing import Generic
from typing import Optional
from typing import TypeVar
import anyio
T = TypeVar("T")
Fn = TypeVar("Fn", bound=Callable[..., object])
[docs]class Broadcast(anyio.Condition, Generic[T]):
"""
A reliable, blocking message queue for delivering to multiple listening tasks
Listeners must acquire the lock (by using the `Broadcast` instance as a context manager)
before calling `Broadcast.receive()` or it will fail. If a listener is repeatedly
awaiting messages in a loop, the loop should be inside the locked context or messages
may be lost to race conditions.
"""
def __init__(self) -> None:
super().__init__()
self.obj: Optional[T] = None
self.exc: Optional[BaseException|type[BaseException]] = None
[docs] async def pre_receive_hook(self) -> None:
"""
A hook for subclasses to inject synchronisation instructions before awaiting objects
""" # noqa: D401
[docs] async def post_send_hook(self) -> None:
"""
A hook for subclasses to inject synchronisation instructions after sending objects
""" # noqa: D401
[docs] async def shutdown_hook(self) -> None:
"""
A hook for subclasses to inject cleanup or synchronisation instructions on close
Users must ensure this method is called, especially if using a subclass which
implements it.
""" # noqa: D401
[docs] async def abort(self, exc: BaseException|type[BaseException]) -> None:
"""
Send a notification to all listeners to abort by raising an exception
"""
async with self._ready():
assert self.exc is None and self.obj is None
self.exc = exc
self.notify_all()
await self._post()
[docs] async def send(self, obj: T) -> None:
"""
Send a message object and block until all listeners have received it
"""
async with self._ready():
assert self.exc is None and self.obj is None
self.obj = obj
self.notify_all()
await self._post()
@asynccontextmanager
async def _ready(self) -> AsyncIterator[None]:
while 1:
await anyio.sleep(0.0)
async with self:
if self.obj is not None or self.exc is not None:
continue
yield
return
async def _post(self) -> None:
await anyio.sleep(0.0) # ensure listeners have opportunity to wait for locks
await self.post_send_hook()
# Ensure all listeners have had a chance to lock and process self.obj
while 1:
async with self:
if self.statistics().lock_statistics.tasks_waiting: # pragma: no-branch
continue
self.obj = self.exc = None
break
[docs] async def receive(self) -> T:
"""
Listen for a single message and return it once it arrives
"""
await self.pre_receive_hook()
await self.wait()
if self.exc is not None:
raise self.exc
assert self.obj is not None
return self.obj
[docs]def qualname(func: Fn) -> str:
"""
Return a qualified name for a callable
"""
if func.__module__ == "__main__":
return func.__qualname__
return f"{func.__module__}.{func.__qualname__}"