1# Copyright 2016 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from concurrent import futures
16import queue
17import re
18import threading
19import time
20import traceback
21
22
23class EventDispatcherError(Exception):
24  pass
25
26
27class IllegalStateError(EventDispatcherError):
28  """Raise when user tries to put event_dispatcher into an illegal state."""
29
30
31class DuplicateError(EventDispatcherError):
32  """Raise when a duplicate is being created and it shouldn't."""
33
34
35class EventDispatcher:
36  """Class managing events for an sl4a connection."""
37
38  DEFAULT_TIMEOUT = 60
39
40  def __init__(self, sl4a):
41    self._sl4a = sl4a
42    self.started = False
43    self.executor = None
44    self.poller = None
45    self.event_dict = {}
46    self.handlers = {}
47    self.lock = threading.RLock()
48
49  def poll_events(self):
50    """Continuously polls all types of events from sl4a.
51
52    Events are sorted by name and store in separate queues.
53    If there are registered handlers, the handlers will be called with
54    corresponding event immediately upon event discovery, and the event
55    won't be stored. If exceptions occur, stop the dispatcher and return
56    """
57    while self.started:
58      event_obj = None
59      event_name = None
60      try:
61        event_obj = self._sl4a.eventWait(50000)
62      except Exception:
63        if self.started:
64          print("Exception happened during polling.")
65          print(traceback.format_exc())
66          raise
67      if not event_obj:
68        continue
69      elif "name" not in event_obj:
70        print("Received Malformed event {}".format(event_obj))
71        continue
72      else:
73        event_name = event_obj["name"]
74      # if handler registered, process event
75      if event_name in self.handlers:
76        self.handle_subscribed_event(event_obj, event_name)
77      if event_name == "EventDispatcherShutdown":
78        self._sl4a.closeSl4aSession()
79        break
80      else:
81        self.lock.acquire()
82        if event_name in self.event_dict:  # otherwise, cache event
83          self.event_dict[event_name].put(event_obj)
84        else:
85          q = queue.Queue()
86          q.put(event_obj)
87          self.event_dict[event_name] = q
88        self.lock.release()
89
90  def register_handler(self, handler, event_name, args):
91    """Registers an event handler.
92
93    One type of event can only have one event handler associated with it.
94
95    Args:
96      handler: The event handler function to be registered.
97      event_name: Name of the event the handler is for.
98      args: User arguments to be passed to the handler when it's called.
99
100    Raises:
101      IllegalStateError: Raised if attempts to register a handler after
102        the dispatcher starts running.
103      DuplicateError: Raised if attempts to register more than one
104        handler for one type of event.
105    """
106    if self.started:
107      raise IllegalStateError("Can't register service after polling is started")
108    self.lock.acquire()
109    try:
110      if event_name in self.handlers:
111        raise DuplicateError(
112            "A handler for {} already exists".format(event_name)
113        )
114      self.handlers[event_name] = (handler, args)
115    finally:
116      self.lock.release()
117
118  def start(self):
119    """Starts the event dispatcher.
120
121    Initiates executor and start polling events.
122
123    Raises:
124      IllegalStateError: Can't start a dispatcher again when it's already
125        running.
126    """
127    if not self.started:
128      self.started = True
129      self.executor = futures.ThreadPoolExecutor(max_workers=32)
130      self.poller = self.executor.submit(self.poll_events)
131    else:
132      raise IllegalStateError("Dispatcher is already started.")
133
134  def clean_up(self):
135    """Clean up and release resources after the event dispatcher polling
136    loop has been broken.
137
138    The following things happen:
139    1. Clear all events and flags.
140    2. Close the sl4a client the event_dispatcher object holds.
141    3. Shut down executor without waiting.
142    """
143    if not self.started:
144      return
145    self.started = False
146    self.clear_all_events()
147    # At this point, the sl4a apk is destroyed and nothing is listening on
148    # the socket. Avoid sending any sl4a commands; just clean up the socket
149    # and return.
150    self._sl4a.disconnect()
151    self.poller.set_result("Done")
152    # The polling thread is guaranteed to finish after a max of 60 seconds,
153    # so we don't wait here.
154    self.executor.shutdown(wait=False)
155
156  def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
157    """Pop an event from its queue.
158
159    Return and remove the oldest entry of an event.
160    Block until an event of specified name is available or
161    times out if timeout is set.
162
163    Args:
164      event_name: Name of the event to be popped.
165      timeout: Number of seconds to wait when event is not present.
166        Never times out if None.
167
168    Returns:
169      The oldest entry of the specified event. None if timed out.
170
171    Raises:
172      IllegalStateError: Raised if pop is called before the dispatcher
173        starts polling.
174    """
175    if not self.started:
176      raise IllegalStateError("Dispatcher needs to be started before popping.")
177
178    e_queue = self.get_event_q(event_name)
179
180    if not e_queue:
181      raise TypeError("Failed to get an event queue for {}".format(event_name))
182
183    try:
184      # Block for timeout
185      if timeout:
186        return e_queue.get(True, timeout)
187      # Non-blocking poll for event
188      elif timeout == 0:
189        return e_queue.get(False)
190      else:
191        # Block forever on event wait
192        return e_queue.get(True)
193    except queue.Empty:
194      raise queue.Empty(
195          "Timeout after {}s waiting for event: {}".format(timeout, event_name)
196      )
197
198  def wait_for_event(
199      self, event_name, predicate, timeout=DEFAULT_TIMEOUT, *args, **kwargs
200  ):
201    """Wait for an event that satisfies a predicate to appear.
202
203    Continuously pop events of a particular name and check against the
204    predicate until an event that satisfies the predicate is popped or
205    timed out. Note this will remove all the events of the same name that
206    do not satisfy the predicate in the process.
207
208    Args:
209      event_name: Name of the event to be popped.
210      predicate: A function that takes an event and returns True if the
211        predicate is satisfied, False otherwise.
212      timeout: Number of seconds to wait.
213      *args: Optional positional args passed to predicate().
214      **kwargs: Optional keyword args passed to predicate().
215
216    Returns:
217      The event that satisfies the predicate.
218
219    Raises:
220      queue.Empty: Raised if no event that satisfies the predicate was
221        found before time out.
222    """
223    deadline = time.perf_counter() + timeout
224
225    while True:
226      event = None
227      try:
228        event = self.pop_event(event_name, 1)
229      except queue.Empty:
230        pass
231
232      if event and predicate(event, *args, **kwargs):
233        return event
234
235      if time.perf_counter() > deadline:
236        raise queue.Empty(
237            "Timeout after {}s waiting for event: {}".format(
238                timeout, event_name
239            )
240        )
241
242  def pop_events(self, regex_pattern, timeout):
243    """Pop events whose names match a regex pattern.
244
245    If such event(s) exist, pop one event from each event queue that
246    satisfies the condition. Otherwise, wait for an event that satisfies
247    the condition to occur, with timeout.
248
249    Results are sorted by timestamp in ascending order.
250
251    Args:
252      regex_pattern: The regular expression pattern that an event name
253        should match in order to be popped.
254      timeout: Number of seconds to wait for events in case no event
255        matching the condition exits when the function is called.
256
257    Returns:
258      Events whose names match a regex pattern.
259      Empty if none exist and the wait timed out.
260
261    Raises:
262      IllegalStateError: Raised if pop is called before the dispatcher
263        starts polling.
264      queue.Empty: Raised if no event was found before time out.
265    """
266    if not self.started:
267      raise IllegalStateError("Dispatcher needs to be started before popping.")
268    deadline = time.perf_counter() + timeout
269    while True:
270      # TODO: fix the sleep loop
271      results = self._match_and_pop(regex_pattern)
272      if len(results) != 0 or time.perf_counter() > deadline:
273        break
274      time.sleep(1)
275    if len(results) == 0:
276      raise queue.Empty(
277          "Timeout after {}s waiting for event: {}".format(
278              timeout, regex_pattern
279          )
280      )
281
282    return sorted(results, key=lambda event: event["time"])
283
284  def _match_and_pop(self, regex_pattern):
285    """Pop one event from each of the event queues whose names
286    match (in a sense of regular expression) regex_pattern.
287    """
288    results = []
289    self.lock.acquire()
290    for name in self.event_dict.keys():
291      if re.match(regex_pattern, name):
292        q = self.event_dict[name]
293        if q:
294          try:
295            results.append(q.get(False))
296          except Exception:
297            pass
298    self.lock.release()
299    return results
300
301  def get_event_q(self, event_name):
302    """Obtain the queue storing events of the specified name.
303
304    If no event of this name has been polled, wait for one to.
305
306    Returns:
307      A queue storing all the events of the specified name.
308      None if timed out.
309
310    Raises:
311      queue.Empty: Raised if the queue does not exist and timeout has
312        passed.
313    """
314    self.lock.acquire()
315    if event_name not in self.event_dict or self.event_dict[event_name] is None:
316      self.event_dict[event_name] = queue.Queue()
317    self.lock.release()
318
319    event_queue = self.event_dict[event_name]
320    return event_queue
321
322  def handle_subscribed_event(self, event_obj, event_name):
323    """Execute the registered handler of an event.
324
325    Retrieve the handler and its arguments, and execute the handler in a
326      new thread.
327
328    Args:
329      event_obj: Json object of the event.
330      event_name: Name of the event to call handler for.
331    """
332    handler, args = self.handlers[event_name]
333    self.executor.submit(handler, event_obj, *args)
334
335  def _handle(
336      self,
337      event_handler,
338      event_name,
339      user_args,
340      event_timeout,
341      cond,
342      cond_timeout,
343  ):
344    """Pop an event of specified type and calls its handler on it. If
345    condition is not None, block until condition is met or timeout.
346    """
347    if cond:
348      cond.wait(cond_timeout)
349    event = self.pop_event(event_name, event_timeout)
350    return event_handler(event, *user_args)
351
352  def handle_event(
353      self,
354      event_handler,
355      event_name,
356      user_args,
357      event_timeout=None,
358      cond=None,
359      cond_timeout=None,
360  ):
361    """Handle events that don't have registered handlers
362
363    In a new thread, poll one event of specified type from its queue and
364    execute its handler. If no such event exists, the thread waits until
365    one appears.
366
367    Args:
368      event_handler: Handler for the event, which should take at least
369        one argument - the event json object.
370      event_name: Name of the event to be handled.
371      user_args: User arguments for the handler; to be passed in after
372        the event json.
373      event_timeout: Number of seconds to wait for the event to come.
374      cond: A condition to wait on before executing the handler. Should
375        be a threading.Event object.
376      cond_timeout: Number of seconds to wait before the condition times
377        out. Never times out if None.
378
379    Returns:
380      A concurrent.Future object associated with the handler.
381      If blocking call worker.result() is triggered, the handler
382      needs to return something to unblock.
383    """
384    worker = self.executor.submit(
385        self._handle,
386        event_handler,
387        event_name,
388        user_args,
389        event_timeout,
390        cond,
391        cond_timeout,
392    )
393    return worker
394
395  def pop_all(self, event_name):
396    """Return and remove all stored events of a specified name.
397
398    Pops all events from their queue. May miss the latest ones.
399    If no event is available, return immediately.
400
401    Args:
402      event_name: Name of the events to be popped.
403
404    Returns:
405      List of the desired events.
406
407    Raises:
408      IllegalStateError: Raised if pop is called before the dispatcher
409        starts polling.
410    """
411    if not self.started:
412      raise IllegalStateError("Dispatcher needs to be started before popping.")
413    results = []
414    try:
415      self.lock.acquire()
416      while True:
417        e = self.event_dict[event_name].get(block=False)
418        results.append(e)
419    except (queue.Empty, KeyError):
420      return results
421    finally:
422      self.lock.release()
423
424  def clear_events(self, event_name):
425    """Clear all events of a particular name.
426
427    Args:
428      event_name: Name of the events to be popped.
429    """
430    self.lock.acquire()
431    try:
432      q = self.get_event_q(event_name)
433      q.queue.clear()
434    except queue.Empty:
435      return
436    finally:
437      self.lock.release()
438
439  def clear_all_events(self):
440    """Clear all event queues and their cached events."""
441    self.lock.acquire()
442    self.event_dict.clear()
443    self.lock.release()
444