xref: /aosp_15_r20/external/pigweed/pw_watch/py/pw_watch/debounce.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Run an interruptable, cancellable function after debouncing run requests"""
15
16import enum
17import logging
18import threading
19from abc import ABC, abstractmethod
20
21from pw_build.project_builder_context import get_project_builder_context
22
23BUILDER_CONTEXT = get_project_builder_context()
24
25_LOG = logging.getLogger('pw_build.watch')
26
27
28class DebouncedFunction(ABC):
29    """Function to be run by Debouncer"""
30
31    @abstractmethod
32    def run(self) -> None:
33        """Run the function"""
34
35    @abstractmethod
36    def cancel(self) -> bool:
37        """Cancel an in-progress run of the function.
38        Must be called from different thread than run().
39        Returns true if run was successfully cancelled, false otherwise"""
40
41    @abstractmethod
42    def on_complete(self, cancelled: bool = False) -> None:
43        """Called after run() finishes. If true, cancelled indicates
44        cancel() was invoked during the last run()"""
45
46    # Note: The debounce uses threads. Since there is no way to guarantee which
47    # thread recieves a KeyboardInterrupt, it is necessary catch this event
48    # in all debouncer threads and forward it to the user.
49    @abstractmethod
50    def on_keyboard_interrupt(self):
51        """Called when keyboard interrupt is delivered to a debouncer thread"""
52
53
54class State(enum.Enum):
55    IDLE = 1  # ------- Transistions to: DEBOUNCING
56    DEBOUNCING = 2  # - Transistions to: RUNNING
57    RUNNING = 3  # ---- Transistions to: INTERRUPTED or COOLDOWN
58    INTERRUPTED = 4  # - Transistions to: RERUN
59    COOLDOWN = 5  # ---- Transistions to: IDLE
60    RERUN = 6  # ------- Transistions to: IDLE (but triggers a press)
61
62
63class Debouncer:
64    """Run an interruptable, cancellable function with debouncing"""
65
66    def __init__(self, function: DebouncedFunction) -> None:
67        super().__init__()
68        self.function = function
69
70        self.state = State.IDLE
71
72        self.debounce_seconds = 1
73        self.debounce_timer = None
74
75        self.cooldown_seconds = 1
76        self.cooldown_timer = None
77
78        self.rerun_event_description = ''
79
80        self.lock = threading.Lock()
81
82    def press(self, event_description: str = '') -> None:
83        """Try to run the function for the class. If the function is recently
84        started, this may push out the deadline for actually starting. If the
85        function is already running, will interrupt the function"""
86        with self.lock:
87            self._press_unlocked(event_description)
88
89    def _press_unlocked(self, event_description: str) -> None:
90        _LOG.debug('Press - state = %s', str(self.state))
91        if self.state == State.IDLE:
92            if event_description:
93                _LOG.info('%s', event_description)
94            self._start_debounce_timer()
95            self._transition(State.DEBOUNCING)
96
97        elif self.state == State.DEBOUNCING:
98            self._start_debounce_timer()
99
100        elif self.state == State.RUNNING:
101            # When the function is already running but we get an incoming
102            # event, go into the INTERRUPTED state to signal that we should
103            # re-try running afterwards.
104            error_message = ['Event while running: %s', event_description]
105            if BUILDER_CONTEXT.using_progress_bars():
106                _LOG.warning(*error_message)
107            else:
108                # Push an empty line to flush ongoing I/O in subprocess.
109                print('')
110
111                # Surround the error message with newlines to make it stand out.
112                print('')
113                _LOG.warning(*error_message)
114                print('')
115
116            self.function.cancel()
117            self._transition(State.INTERRUPTED)
118            self.rerun_event_description = event_description
119
120        elif self.state == State.INTERRUPTED:
121            # Function is running but was already interrupted. Do nothing.
122            _LOG.debug('Ignoring press - interrupted')
123
124        elif self.state == State.COOLDOWN:
125            # Function just finished and we are cooling down; so trigger rerun.
126            _LOG.debug('Got event in cooldown; scheduling rerun')
127            self._transition(State.RERUN)
128            self.rerun_event_description = event_description
129
130    def _transition(self, new_state: State) -> None:
131        _LOG.debug('State: %s -> %s', self.state, new_state)
132        self.state = new_state
133
134    def _start_debounce_timer(self):
135        assert self.lock.locked()
136        if self.state == State.DEBOUNCING:
137            self.debounce_timer.cancel()
138        self.debounce_timer = threading.Timer(
139            self.debounce_seconds, self._run_function
140        )
141        self.debounce_timer.start()
142
143    # Called from debounce_timer thread.
144    def _run_function(self):
145        try:
146            with self.lock:
147                assert self.state == State.DEBOUNCING
148                self.debounce_timer = None
149                self._transition(State.RUNNING)
150
151            # Must run the function without the lock held so further press()
152            # calls don't deadlock.
153            _LOG.debug('Running debounced function')
154            self.function.run()
155
156            _LOG.debug('Finished running debounced function')
157            with self.lock:
158                if self.state == State.RUNNING:
159                    self.function.on_complete(cancelled=False)
160                    self._transition(State.COOLDOWN)
161                elif self.state == State.INTERRUPTED:
162                    self.function.on_complete(cancelled=True)
163                    self._transition(State.RERUN)
164                self._start_cooldown_timer()
165        # Ctrl-C on Unix generates KeyboardInterrupt
166        # Ctrl-Z on Windows generates EOFError
167        except (KeyboardInterrupt, EOFError):
168            self.function.on_keyboard_interrupt()
169
170    def _start_cooldown_timer(self):
171        assert self.lock.locked()
172        self.cooldown_timer = threading.Timer(
173            self.cooldown_seconds, self._exit_cooldown
174        )
175        self.cooldown_timer.start()
176
177    # Called from cooldown_timer thread.
178    def _exit_cooldown(self):
179        try:
180            with self.lock:
181                self.cooldown_timer = None
182                rerun = self.state == State.RERUN
183                self._transition(State.IDLE)
184
185                # If we were in the RERUN state, then re-trigger the event.
186                if rerun:
187                    self._press_unlocked(
188                        'Rerunning: ' + self.rerun_event_description
189                    )
190
191        # Ctrl-C on Unix generates KeyboardInterrupt
192        # Ctrl-Z on Windows generates EOFError
193        except (KeyboardInterrupt, EOFError):
194            self.function.on_keyboard_interrupt()
195