1# -*- coding: utf-8 -*- 2 3from collections import OrderedDict 4from threading import Lock 5from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union 6 7 8class PyeeException(Exception): 9 """An exception internal to pyee.""" 10 11 12Handler = TypeVar(name="Handler", bound=Callable) 13 14 15class EventEmitter: 16 """The base event emitter class. All other event emitters inherit from 17 this class. 18 19 Most events are registered with an emitter via the ``on`` and ``once`` 20 methods, and fired with the ``emit`` method. However, pyee event emitters 21 have two *special* events: 22 23 - ``new_listener``: Fires whenever a new listener is created. Listeners for 24 this event do not fire upon their own creation. 25 26 - ``error``: When emitted raises an Exception by default, behavior can be 27 overridden by attaching callback to the event. 28 29 For example:: 30 31 @ee.on('error') 32 def on_error(message): 33 logging.err(message) 34 35 ee.emit('error', Exception('something blew up')) 36 37 All callbacks are handled in a synchronous, blocking manner. As in node.js, 38 raised exceptions are not automatically handled for you---you must catch 39 your own exceptions, and treat them accordingly. 40 """ 41 42 def __init__(self) -> None: 43 self._events: Dict[ 44 str, 45 "OrderedDict[Callable, Callable]", 46 ] = dict() 47 self._lock: Lock = Lock() 48 49 def on( 50 self, event: str, f: Optional[Handler] = None 51 ) -> Union[Handler, Callable[[Handler], Handler]]: 52 """Registers the function ``f`` to the event name ``event``, if provided. 53 54 If ``f`` isn't provided, this method calls ``EventEmitter#listens_to`, and 55 otherwise calls ``EventEmitter#add_listener``. In other words, you may either 56 use it as a decorator:: 57 58 @ee.on('data') 59 def data_handler(data): 60 print(data) 61 62 Or directly:: 63 64 ee.on('data', data_handler) 65 66 In both the decorated and undecorated forms, the event handler is 67 returned. The upshot of this is that you can call decorated handlers 68 directly, as well as use them in remove_listener calls. 69 70 Note that this method's return type is a union type. If you are using 71 mypy or pyright, you will probably want to use either 72 ``EventEmitter#listens_to`` or ``EventEmitter#add_listener``. 73 """ 74 if f is None: 75 return self.listens_to(event) 76 else: 77 return self.add_listener(event, f) 78 79 def listens_to(self, event: str) -> Callable[[Handler], Handler]: 80 """Returns a decorator which will register the decorated function to 81 the event name ``event``:: 82 83 @ee.listens_to("event") 84 def data_handler(data): 85 print(data) 86 87 By only supporting the decorator use case, this method has improved 88 type safety over ``EventEmitter#on``. 89 """ 90 91 def on(f: Handler) -> Handler: 92 self._add_event_handler(event, f, f) 93 return f 94 95 return on 96 97 def add_listener(self, event: str, f: Handler) -> Handler: 98 """Register the function ``f`` to the event name ``event``:: 99 100 def data_handler(data): 101 print(data) 102 103 h = ee.add_listener("event", data_handler) 104 105 By not supporting the decorator use case, this method has improved 106 type safety over ``EventEmitter#on``. 107 """ 108 self._add_event_handler(event, f, f) 109 return f 110 111 def _add_event_handler(self, event: str, k: Callable, v: Callable): 112 # Fire 'new_listener' *before* adding the new listener! 113 self.emit("new_listener", event, k) 114 115 # Add the necessary function 116 # Note that k and v are the same for `on` handlers, but 117 # different for `once` handlers, where v is a wrapped version 118 # of k which removes itself before calling k 119 with self._lock: 120 if event not in self._events: 121 self._events[event] = OrderedDict() 122 self._events[event][k] = v 123 124 def _emit_run( 125 self, 126 f: Callable, 127 args: Tuple[Any, ...], 128 kwargs: Dict[str, Any], 129 ) -> None: 130 f(*args, **kwargs) 131 132 def event_names(self) -> Set[str]: 133 """Get a set of events that this emitter is listening to.""" 134 return set(self._events.keys()) 135 136 def _emit_handle_potential_error(self, event: str, error: Any) -> None: 137 if event == "error": 138 if isinstance(error, Exception): 139 raise error 140 else: 141 raise PyeeException(f"Uncaught, unspecified 'error' event: {error}") 142 143 def _call_handlers( 144 self, 145 event: str, 146 args: Tuple[Any, ...], 147 kwargs: Dict[str, Any], 148 ) -> bool: 149 handled = False 150 151 with self._lock: 152 funcs = list(self._events.get(event, OrderedDict()).values()) 153 for f in funcs: 154 self._emit_run(f, args, kwargs) 155 handled = True 156 157 return handled 158 159 def emit( 160 self, 161 event: str, 162 *args: Any, 163 **kwargs: Any, 164 ) -> bool: 165 """Emit ``event``, passing ``*args`` and ``**kwargs`` to each attached 166 function. Returns ``True`` if any functions are attached to ``event``; 167 otherwise returns ``False``. 168 169 Example:: 170 171 ee.emit('data', '00101001') 172 173 Assuming ``data`` is an attached function, this will call 174 ``data('00101001')'``. 175 """ 176 handled = self._call_handlers(event, args, kwargs) 177 178 if not handled: 179 self._emit_handle_potential_error(event, args[0] if args else None) 180 181 return handled 182 183 def once( 184 self, 185 event: str, 186 f: Callable = None, 187 ) -> Callable: 188 """The same as ``ee.on``, except that the listener is automatically 189 removed after being called. 190 """ 191 192 def _wrapper(f: Callable) -> Callable: 193 def g( 194 *args: Any, 195 **kwargs: Any, 196 ) -> Any: 197 with self._lock: 198 # Check that the event wasn't removed already right 199 # before the lock 200 if event in self._events and f in self._events[event]: 201 self._remove_listener(event, f) 202 else: 203 return None 204 # f may return a coroutine, so we need to return that 205 # result here so that emit can schedule it 206 return f(*args, **kwargs) 207 208 self._add_event_handler(event, f, g) 209 return f 210 211 if f is None: 212 return _wrapper 213 else: 214 return _wrapper(f) 215 216 def _remove_listener(self, event: str, f: Callable) -> None: 217 """Naked unprotected removal.""" 218 self._events[event].pop(f) 219 if not len(self._events[event]): 220 del self._events[event] 221 222 def remove_listener(self, event: str, f: Callable) -> None: 223 """Removes the function ``f`` from ``event``.""" 224 with self._lock: 225 self._remove_listener(event, f) 226 227 def remove_all_listeners(self, event: Optional[str] = None) -> None: 228 """Remove all listeners attached to ``event``. 229 If ``event`` is ``None``, remove all listeners on all events. 230 """ 231 with self._lock: 232 if event is not None: 233 self._events[event] = OrderedDict() 234 else: 235 self._events = dict() 236 237 def listeners(self, event: str) -> List[Callable]: 238 """Returns a list of all listeners registered to the ``event``.""" 239 return list(self._events.get(event, OrderedDict()).keys()) 240