1# -*- coding: utf-8 -*-
2
3from contextlib import AbstractAsyncContextManager, asynccontextmanager
4from types import TracebackType
5from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Optional, Tuple, Type
6
7import trio
8
9from pyee.base import EventEmitter, PyeeException
10
11__all__ = ["TrioEventEmitter"]
12
13
14Nursery = trio.Nursery
15
16
17class TrioEventEmitter(EventEmitter):
18    """An event emitter class which can run trio tasks in a trio nursery.
19
20    By default, this class will lazily create both a nursery manager (the
21    object returned from ``trio.open_nursery()`` and a nursery (the object
22    yielded by using the nursery manager as an async context manager). It is
23    also possible to supply an existing nursery manager via the ``manager``
24    argument, or an existing nursery via the ``nursery`` argument.
25
26    Instances of TrioEventEmitter are themselves async context managers, so
27    that they may manage the lifecycle of the underlying trio nursery. For
28    example, typical usage of this library may look something like this::
29
30        async with TrioEventEmitter() as ee:
31            # Underlying nursery is instantiated and ready to go
32            @ee.on('data')
33            async def handler(data):
34                print(data)
35
36            ee.emit('event')
37
38        # Underlying nursery and manager have been cleaned up
39
40    Unlike the case with the EventEmitter, all exceptions raised by event
41    handlers are automatically emitted on the ``error`` event. This is
42    important for trio coroutines specifically but is also handled for
43    synchronous functions for consistency.
44
45    For trio coroutine event handlers, calling emit is non-blocking. In other
46    words, you should not attempt to await emit; the coroutine is scheduled
47    in a fire-and-forget fashion.
48    """
49
50    def __init__(
51        self,
52        nursery: Nursery = None,
53        manager: "AbstractAsyncContextManager[trio.Nursery]" = None,
54    ):
55        super(TrioEventEmitter, self).__init__()
56        self._nursery: Optional[Nursery] = None
57        self._manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None
58        if nursery:
59            if manager:
60                raise PyeeException(
61                    "You may either pass a nursery or a nursery manager " "but not both"
62                )
63            self._nursery = nursery
64        elif manager:
65            self._manager = manager
66        else:
67            self._manager = trio.open_nursery()
68
69    def _async_runner(
70        self,
71        f: Callable,
72        args: Tuple[Any, ...],
73        kwargs: Dict[str, Any],
74    ) -> Callable[[], Awaitable[None]]:
75        async def runner() -> None:
76            try:
77                await f(*args, **kwargs)
78            except Exception as exc:
79                self.emit("error", exc)
80
81        return runner
82
83    def _emit_run(
84        self,
85        f: Callable,
86        args: Tuple[Any, ...],
87        kwargs: Dict[str, Any],
88    ) -> None:
89        if not self._nursery:
90            raise PyeeException("Uninitialized trio nursery")
91        self._nursery.start_soon(self._async_runner(f, args, kwargs))
92
93    @asynccontextmanager
94    async def context(
95        self,
96    ) -> AsyncGenerator["TrioEventEmitter", None]:
97        """Returns an async contextmanager which manages the underlying
98        nursery to the EventEmitter. The ``TrioEventEmitter``'s
99        async context management methods are implemented using this
100        function, but it may also be used directly for clarity.
101        """
102        if self._nursery is not None:
103            yield self
104        elif self._manager is not None:
105            async with self._manager as nursery:
106                self._nursery = nursery
107                yield self
108        else:
109            raise PyeeException("Uninitialized nursery or nursery manager")
110
111    async def __aenter__(self) -> "TrioEventEmitter":
112        self._context: Optional[
113            AbstractAsyncContextManager["TrioEventEmitter"]
114        ] = self.context()
115        return await self._context.__aenter__()
116
117    async def __aexit__(
118        self,
119        type: Optional[Type[BaseException]],
120        value: Optional[BaseException],
121        traceback: Optional[TracebackType],
122    ) -> Optional[bool]:
123        if self._context is None:
124            raise PyeeException("Attempting to exit uninitialized context")
125        rv = await self._context.__aexit__(type, value, traceback)
126        self._context = None
127        self._nursery = None
128        self._manager = None
129        return rv
130