# Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Common helper utilities
"""
# mypy: disallow-any-explicit=False
from __future__ import annotations
from collections.abc import AsyncIterator
from collections.abc import Callable
from contextlib import asynccontextmanager
from typing import Generic
from typing import Optional
from typing import TypeVar
import anyio
T = TypeVar("T")
Fn = TypeVar("Fn", bound=Callable[..., object])
[docs]class Broadcast(anyio.Condition, Generic[T]):
	"""
	A reliable, blocking message queue for delivering to multiple listening tasks
	Listeners must acquire the lock (by using the `Broadcast` instance as a context manager)
	before calling `Broadcast.receive()` or it will fail.  If a listener is repeatedly
	awaiting messages in a loop, the loop should be inside the locked context or messages
	may be lost to race conditions.
	"""
	def __init__(self) -> None:
		super().__init__()
		self.obj: Optional[T] = None
		self.exc: Optional[BaseException|type[BaseException]] = None
[docs]	async def pre_receive_hook(self) -> None:
		"""
		A hook for subclasses to inject synchronisation instructions before awaiting objects
		"""  # noqa: D401 
[docs]	async def post_send_hook(self) -> None:
		"""
		A hook for subclasses to inject synchronisation instructions after sending objects
		"""  # noqa: D401 
[docs]	async def shutdown_hook(self) -> None:
		"""
		A hook for subclasses to inject cleanup or synchronisation instructions on close
		Users must ensure this method is called, especially if using a subclass which
		implements it.
		"""  # noqa: D401 
[docs]	async def abort(self, exc: BaseException|type[BaseException]) -> None:
		"""
		Send a notification to all listeners to abort by raising an exception
		"""
		async with self._ready():
			assert self.exc is None and self.obj is None
			self.exc = exc
			self.notify_all()
		await self._post() 
[docs]	async def send(self, obj: T) -> None:
		"""
		Send a message object and block until all listeners have received it
		"""
		async with self._ready():
			assert self.exc is None and self.obj is None
			self.obj = obj
			self.notify_all()
		await self._post() 
	@asynccontextmanager
	async def _ready(self) -> AsyncIterator[None]:
		while 1:
			await anyio.sleep(0.0)
			async with self:
				if self.obj is not None or self.exc is not None:
					continue
				yield
				return
	async def _post(self) -> None:
		await anyio.sleep(0.0)  # ensure listeners have opportunity to wait for locks
		await self.post_send_hook()
		# Ensure all listeners have had a chance to lock and process self.obj
		while 1:
			async with self:
				if self.statistics().lock_statistics.tasks_waiting:  # pragma: no-branch
					continue
				self.obj = self.exc = None
				break
[docs]	async def receive(self) -> T:
		"""
		Listen for a single message and return it once it arrives
		"""
		await self.pre_receive_hook()
		await self.wait()
		if self.exc is not None:
			raise self.exc
		assert self.obj is not None
		return self.obj  
[docs]def qualname(func: Fn) -> str:
	"""
	Return a qualified name for a callable
	"""
	if func.__module__ == "__main__":
		return func.__qualname__
	return f"{func.__module__}.{func.__qualname__}"