1# -*- coding: utf-8 -*-
2
3from collections import OrderedDict
4from threading import Lock
5from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union
6
7
8class PyeeException(Exception):
9    """An exception internal to pyee."""
10
11
12Handler = TypeVar(name="Handler", bound=Callable)
13
14
15class EventEmitter:
16    """The base event emitter class. All other event emitters inherit from
17    this class.
18
19    Most events are registered with an emitter via the ``on`` and ``once``
20    methods, and fired with the ``emit`` method. However, pyee event emitters
21    have two *special* events:
22
23    - ``new_listener``: Fires whenever a new listener is created. Listeners for
24      this event do not fire upon their own creation.
25
26    - ``error``: When emitted raises an Exception by default, behavior can be
27      overridden by attaching callback to the event.
28
29      For example::
30
31          @ee.on('error')
32          def on_error(message):
33              logging.err(message)
34
35          ee.emit('error', Exception('something blew up'))
36
37    All callbacks are handled in a synchronous, blocking manner. As in node.js,
38    raised exceptions are not automatically handled for you---you must catch
39    your own exceptions, and treat them accordingly.
40    """
41
42    def __init__(self) -> None:
43        self._events: Dict[
44            str,
45            "OrderedDict[Callable, Callable]",
46        ] = dict()
47        self._lock: Lock = Lock()
48
49    def on(
50        self, event: str, f: Optional[Handler] = None
51    ) -> Union[Handler, Callable[[Handler], Handler]]:
52        """Registers the function ``f`` to the event name ``event``, if provided.
53
54        If ``f`` isn't provided, this method calls ``EventEmitter#listens_to`, and
55        otherwise calls ``EventEmitter#add_listener``. In other words, you may either
56        use it as a decorator::
57
58            @ee.on('data')
59            def data_handler(data):
60                print(data)
61
62        Or directly::
63
64            ee.on('data', data_handler)
65
66        In both the decorated and undecorated forms, the event handler is
67        returned. The upshot of this is that you can call decorated handlers
68        directly, as well as use them in remove_listener calls.
69
70        Note that this method's return type is a union type. If you are using
71        mypy or pyright, you will probably want to use either
72        ``EventEmitter#listens_to`` or ``EventEmitter#add_listener``.
73        """
74        if f is None:
75            return self.listens_to(event)
76        else:
77            return self.add_listener(event, f)
78
79    def listens_to(self, event: str) -> Callable[[Handler], Handler]:
80        """Returns a decorator which will register the decorated function to
81        the event name ``event``::
82
83            @ee.listens_to("event")
84            def data_handler(data):
85                print(data)
86
87        By only supporting the decorator use case, this method has improved
88        type safety over ``EventEmitter#on``.
89        """
90
91        def on(f: Handler) -> Handler:
92            self._add_event_handler(event, f, f)
93            return f
94
95        return on
96
97    def add_listener(self, event: str, f: Handler) -> Handler:
98        """Register the function ``f`` to the event name ``event``::
99
100            def data_handler(data):
101                print(data)
102
103            h = ee.add_listener("event", data_handler)
104
105        By not supporting the decorator use case, this method has improved
106        type safety over ``EventEmitter#on``.
107        """
108        self._add_event_handler(event, f, f)
109        return f
110
111    def _add_event_handler(self, event: str, k: Callable, v: Callable):
112        # Fire 'new_listener' *before* adding the new listener!
113        self.emit("new_listener", event, k)
114
115        # Add the necessary function
116        # Note that k and v are the same for `on` handlers, but
117        # different for `once` handlers, where v is a wrapped version
118        # of k which removes itself before calling k
119        with self._lock:
120            if event not in self._events:
121                self._events[event] = OrderedDict()
122            self._events[event][k] = v
123
124    def _emit_run(
125        self,
126        f: Callable,
127        args: Tuple[Any, ...],
128        kwargs: Dict[str, Any],
129    ) -> None:
130        f(*args, **kwargs)
131
132    def event_names(self) -> Set[str]:
133        """Get a set of events that this emitter is listening to."""
134        return set(self._events.keys())
135
136    def _emit_handle_potential_error(self, event: str, error: Any) -> None:
137        if event == "error":
138            if isinstance(error, Exception):
139                raise error
140            else:
141                raise PyeeException(f"Uncaught, unspecified 'error' event: {error}")
142
143    def _call_handlers(
144        self,
145        event: str,
146        args: Tuple[Any, ...],
147        kwargs: Dict[str, Any],
148    ) -> bool:
149        handled = False
150
151        with self._lock:
152            funcs = list(self._events.get(event, OrderedDict()).values())
153        for f in funcs:
154            self._emit_run(f, args, kwargs)
155            handled = True
156
157        return handled
158
159    def emit(
160        self,
161        event: str,
162        *args: Any,
163        **kwargs: Any,
164    ) -> bool:
165        """Emit ``event``, passing ``*args`` and ``**kwargs`` to each attached
166        function. Returns ``True`` if any functions are attached to ``event``;
167        otherwise returns ``False``.
168
169        Example::
170
171            ee.emit('data', '00101001')
172
173        Assuming ``data`` is an attached function, this will call
174        ``data('00101001')'``.
175        """
176        handled = self._call_handlers(event, args, kwargs)
177
178        if not handled:
179            self._emit_handle_potential_error(event, args[0] if args else None)
180
181        return handled
182
183    def once(
184        self,
185        event: str,
186        f: Callable = None,
187    ) -> Callable:
188        """The same as ``ee.on``, except that the listener is automatically
189        removed after being called.
190        """
191
192        def _wrapper(f: Callable) -> Callable:
193            def g(
194                *args: Any,
195                **kwargs: Any,
196            ) -> Any:
197                with self._lock:
198                    # Check that the event wasn't removed already right
199                    # before the lock
200                    if event in self._events and f in self._events[event]:
201                        self._remove_listener(event, f)
202                    else:
203                        return None
204                # f may return a coroutine, so we need to return that
205                # result here so that emit can schedule it
206                return f(*args, **kwargs)
207
208            self._add_event_handler(event, f, g)
209            return f
210
211        if f is None:
212            return _wrapper
213        else:
214            return _wrapper(f)
215
216    def _remove_listener(self, event: str, f: Callable) -> None:
217        """Naked unprotected removal."""
218        self._events[event].pop(f)
219        if not len(self._events[event]):
220            del self._events[event]
221
222    def remove_listener(self, event: str, f: Callable) -> None:
223        """Removes the function ``f`` from ``event``."""
224        with self._lock:
225            self._remove_listener(event, f)
226
227    def remove_all_listeners(self, event: Optional[str] = None) -> None:
228        """Remove all listeners attached to ``event``.
229        If ``event`` is ``None``, remove all listeners on all events.
230        """
231        with self._lock:
232            if event is not None:
233                self._events[event] = OrderedDict()
234            else:
235                self._events = dict()
236
237    def listeners(self, event: str) -> List[Callable]:
238        """Returns a list of all listeners registered to the ``event``."""
239        return list(self._events.get(event, OrderedDict()).keys())
240