1# Copyright 2016 Google Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent import futures 16import queue 17import re 18import threading 19import time 20import traceback 21 22 23class EventDispatcherError(Exception): 24 pass 25 26 27class IllegalStateError(EventDispatcherError): 28 """Raise when user tries to put event_dispatcher into an illegal state.""" 29 30 31class DuplicateError(EventDispatcherError): 32 """Raise when a duplicate is being created and it shouldn't.""" 33 34 35class EventDispatcher: 36 """Class managing events for an sl4a connection.""" 37 38 DEFAULT_TIMEOUT = 60 39 40 def __init__(self, sl4a): 41 self._sl4a = sl4a 42 self.started = False 43 self.executor = None 44 self.poller = None 45 self.event_dict = {} 46 self.handlers = {} 47 self.lock = threading.RLock() 48 49 def poll_events(self): 50 """Continuously polls all types of events from sl4a. 51 52 Events are sorted by name and store in separate queues. 53 If there are registered handlers, the handlers will be called with 54 corresponding event immediately upon event discovery, and the event 55 won't be stored. If exceptions occur, stop the dispatcher and return 56 """ 57 while self.started: 58 event_obj = None 59 event_name = None 60 try: 61 event_obj = self._sl4a.eventWait(50000) 62 except Exception: 63 if self.started: 64 print("Exception happened during polling.") 65 print(traceback.format_exc()) 66 raise 67 if not event_obj: 68 continue 69 elif "name" not in event_obj: 70 print("Received Malformed event {}".format(event_obj)) 71 continue 72 else: 73 event_name = event_obj["name"] 74 # if handler registered, process event 75 if event_name in self.handlers: 76 self.handle_subscribed_event(event_obj, event_name) 77 if event_name == "EventDispatcherShutdown": 78 self._sl4a.closeSl4aSession() 79 break 80 else: 81 self.lock.acquire() 82 if event_name in self.event_dict: # otherwise, cache event 83 self.event_dict[event_name].put(event_obj) 84 else: 85 q = queue.Queue() 86 q.put(event_obj) 87 self.event_dict[event_name] = q 88 self.lock.release() 89 90 def register_handler(self, handler, event_name, args): 91 """Registers an event handler. 92 93 One type of event can only have one event handler associated with it. 94 95 Args: 96 handler: The event handler function to be registered. 97 event_name: Name of the event the handler is for. 98 args: User arguments to be passed to the handler when it's called. 99 100 Raises: 101 IllegalStateError: Raised if attempts to register a handler after 102 the dispatcher starts running. 103 DuplicateError: Raised if attempts to register more than one 104 handler for one type of event. 105 """ 106 if self.started: 107 raise IllegalStateError("Can't register service after polling is started") 108 self.lock.acquire() 109 try: 110 if event_name in self.handlers: 111 raise DuplicateError( 112 "A handler for {} already exists".format(event_name) 113 ) 114 self.handlers[event_name] = (handler, args) 115 finally: 116 self.lock.release() 117 118 def start(self): 119 """Starts the event dispatcher. 120 121 Initiates executor and start polling events. 122 123 Raises: 124 IllegalStateError: Can't start a dispatcher again when it's already 125 running. 126 """ 127 if not self.started: 128 self.started = True 129 self.executor = futures.ThreadPoolExecutor(max_workers=32) 130 self.poller = self.executor.submit(self.poll_events) 131 else: 132 raise IllegalStateError("Dispatcher is already started.") 133 134 def clean_up(self): 135 """Clean up and release resources after the event dispatcher polling 136 loop has been broken. 137 138 The following things happen: 139 1. Clear all events and flags. 140 2. Close the sl4a client the event_dispatcher object holds. 141 3. Shut down executor without waiting. 142 """ 143 if not self.started: 144 return 145 self.started = False 146 self.clear_all_events() 147 # At this point, the sl4a apk is destroyed and nothing is listening on 148 # the socket. Avoid sending any sl4a commands; just clean up the socket 149 # and return. 150 self._sl4a.disconnect() 151 self.poller.set_result("Done") 152 # The polling thread is guaranteed to finish after a max of 60 seconds, 153 # so we don't wait here. 154 self.executor.shutdown(wait=False) 155 156 def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT): 157 """Pop an event from its queue. 158 159 Return and remove the oldest entry of an event. 160 Block until an event of specified name is available or 161 times out if timeout is set. 162 163 Args: 164 event_name: Name of the event to be popped. 165 timeout: Number of seconds to wait when event is not present. 166 Never times out if None. 167 168 Returns: 169 The oldest entry of the specified event. None if timed out. 170 171 Raises: 172 IllegalStateError: Raised if pop is called before the dispatcher 173 starts polling. 174 """ 175 if not self.started: 176 raise IllegalStateError("Dispatcher needs to be started before popping.") 177 178 e_queue = self.get_event_q(event_name) 179 180 if not e_queue: 181 raise TypeError("Failed to get an event queue for {}".format(event_name)) 182 183 try: 184 # Block for timeout 185 if timeout: 186 return e_queue.get(True, timeout) 187 # Non-blocking poll for event 188 elif timeout == 0: 189 return e_queue.get(False) 190 else: 191 # Block forever on event wait 192 return e_queue.get(True) 193 except queue.Empty: 194 raise queue.Empty( 195 "Timeout after {}s waiting for event: {}".format(timeout, event_name) 196 ) 197 198 def wait_for_event( 199 self, event_name, predicate, timeout=DEFAULT_TIMEOUT, *args, **kwargs 200 ): 201 """Wait for an event that satisfies a predicate to appear. 202 203 Continuously pop events of a particular name and check against the 204 predicate until an event that satisfies the predicate is popped or 205 timed out. Note this will remove all the events of the same name that 206 do not satisfy the predicate in the process. 207 208 Args: 209 event_name: Name of the event to be popped. 210 predicate: A function that takes an event and returns True if the 211 predicate is satisfied, False otherwise. 212 timeout: Number of seconds to wait. 213 *args: Optional positional args passed to predicate(). 214 **kwargs: Optional keyword args passed to predicate(). 215 216 Returns: 217 The event that satisfies the predicate. 218 219 Raises: 220 queue.Empty: Raised if no event that satisfies the predicate was 221 found before time out. 222 """ 223 deadline = time.perf_counter() + timeout 224 225 while True: 226 event = None 227 try: 228 event = self.pop_event(event_name, 1) 229 except queue.Empty: 230 pass 231 232 if event and predicate(event, *args, **kwargs): 233 return event 234 235 if time.perf_counter() > deadline: 236 raise queue.Empty( 237 "Timeout after {}s waiting for event: {}".format( 238 timeout, event_name 239 ) 240 ) 241 242 def pop_events(self, regex_pattern, timeout): 243 """Pop events whose names match a regex pattern. 244 245 If such event(s) exist, pop one event from each event queue that 246 satisfies the condition. Otherwise, wait for an event that satisfies 247 the condition to occur, with timeout. 248 249 Results are sorted by timestamp in ascending order. 250 251 Args: 252 regex_pattern: The regular expression pattern that an event name 253 should match in order to be popped. 254 timeout: Number of seconds to wait for events in case no event 255 matching the condition exits when the function is called. 256 257 Returns: 258 Events whose names match a regex pattern. 259 Empty if none exist and the wait timed out. 260 261 Raises: 262 IllegalStateError: Raised if pop is called before the dispatcher 263 starts polling. 264 queue.Empty: Raised if no event was found before time out. 265 """ 266 if not self.started: 267 raise IllegalStateError("Dispatcher needs to be started before popping.") 268 deadline = time.perf_counter() + timeout 269 while True: 270 # TODO: fix the sleep loop 271 results = self._match_and_pop(regex_pattern) 272 if len(results) != 0 or time.perf_counter() > deadline: 273 break 274 time.sleep(1) 275 if len(results) == 0: 276 raise queue.Empty( 277 "Timeout after {}s waiting for event: {}".format( 278 timeout, regex_pattern 279 ) 280 ) 281 282 return sorted(results, key=lambda event: event["time"]) 283 284 def _match_and_pop(self, regex_pattern): 285 """Pop one event from each of the event queues whose names 286 match (in a sense of regular expression) regex_pattern. 287 """ 288 results = [] 289 self.lock.acquire() 290 for name in self.event_dict.keys(): 291 if re.match(regex_pattern, name): 292 q = self.event_dict[name] 293 if q: 294 try: 295 results.append(q.get(False)) 296 except Exception: 297 pass 298 self.lock.release() 299 return results 300 301 def get_event_q(self, event_name): 302 """Obtain the queue storing events of the specified name. 303 304 If no event of this name has been polled, wait for one to. 305 306 Returns: 307 A queue storing all the events of the specified name. 308 None if timed out. 309 310 Raises: 311 queue.Empty: Raised if the queue does not exist and timeout has 312 passed. 313 """ 314 self.lock.acquire() 315 if event_name not in self.event_dict or self.event_dict[event_name] is None: 316 self.event_dict[event_name] = queue.Queue() 317 self.lock.release() 318 319 event_queue = self.event_dict[event_name] 320 return event_queue 321 322 def handle_subscribed_event(self, event_obj, event_name): 323 """Execute the registered handler of an event. 324 325 Retrieve the handler and its arguments, and execute the handler in a 326 new thread. 327 328 Args: 329 event_obj: Json object of the event. 330 event_name: Name of the event to call handler for. 331 """ 332 handler, args = self.handlers[event_name] 333 self.executor.submit(handler, event_obj, *args) 334 335 def _handle( 336 self, 337 event_handler, 338 event_name, 339 user_args, 340 event_timeout, 341 cond, 342 cond_timeout, 343 ): 344 """Pop an event of specified type and calls its handler on it. If 345 condition is not None, block until condition is met or timeout. 346 """ 347 if cond: 348 cond.wait(cond_timeout) 349 event = self.pop_event(event_name, event_timeout) 350 return event_handler(event, *user_args) 351 352 def handle_event( 353 self, 354 event_handler, 355 event_name, 356 user_args, 357 event_timeout=None, 358 cond=None, 359 cond_timeout=None, 360 ): 361 """Handle events that don't have registered handlers 362 363 In a new thread, poll one event of specified type from its queue and 364 execute its handler. If no such event exists, the thread waits until 365 one appears. 366 367 Args: 368 event_handler: Handler for the event, which should take at least 369 one argument - the event json object. 370 event_name: Name of the event to be handled. 371 user_args: User arguments for the handler; to be passed in after 372 the event json. 373 event_timeout: Number of seconds to wait for the event to come. 374 cond: A condition to wait on before executing the handler. Should 375 be a threading.Event object. 376 cond_timeout: Number of seconds to wait before the condition times 377 out. Never times out if None. 378 379 Returns: 380 A concurrent.Future object associated with the handler. 381 If blocking call worker.result() is triggered, the handler 382 needs to return something to unblock. 383 """ 384 worker = self.executor.submit( 385 self._handle, 386 event_handler, 387 event_name, 388 user_args, 389 event_timeout, 390 cond, 391 cond_timeout, 392 ) 393 return worker 394 395 def pop_all(self, event_name): 396 """Return and remove all stored events of a specified name. 397 398 Pops all events from their queue. May miss the latest ones. 399 If no event is available, return immediately. 400 401 Args: 402 event_name: Name of the events to be popped. 403 404 Returns: 405 List of the desired events. 406 407 Raises: 408 IllegalStateError: Raised if pop is called before the dispatcher 409 starts polling. 410 """ 411 if not self.started: 412 raise IllegalStateError("Dispatcher needs to be started before popping.") 413 results = [] 414 try: 415 self.lock.acquire() 416 while True: 417 e = self.event_dict[event_name].get(block=False) 418 results.append(e) 419 except (queue.Empty, KeyError): 420 return results 421 finally: 422 self.lock.release() 423 424 def clear_events(self, event_name): 425 """Clear all events of a particular name. 426 427 Args: 428 event_name: Name of the events to be popped. 429 """ 430 self.lock.acquire() 431 try: 432 q = self.get_event_q(event_name) 433 q.queue.clear() 434 except queue.Empty: 435 return 436 finally: 437 self.lock.release() 438 439 def clear_all_events(self): 440 """Clear all event queues and their cached events.""" 441 self.lock.acquire() 442 self.event_dict.clear() 443 self.lock.release() 444