Source code for aiohttp.signals

from aiohttp.frozenlist import FrozenList


__all__ = ('Signal',)


[docs]class Signal(FrozenList): """Coroutine-based signal implementation. To connect a callback to a signal, use any list method. Signals are fired using the send() coroutine, which takes named arguments. """ __slots__ = ('_owner',) def __init__(self, owner): super().__init__() self._owner = owner def __repr__(self): return '<Signal owner={}, frozen={}, {!r}>'.format(self._owner, self.frozen, list(self))
[docs] async def send(self, *args, **kwargs): """ Sends data to all registered receivers. """ if not self.frozen: raise RuntimeError("Cannot send non-frozen signal.") for receiver in self: await receiver(*args, **kwargs)