1import queue
2import sched
3import threading
4import time
5import unittest
6from test import support
7from test.support import threading_helper
8
9
10TIMEOUT = support.SHORT_TIMEOUT
11
12
13class Timer:
14    def __init__(self):
15        self._cond = threading.Condition()
16        self._time = 0
17        self._stop = 0
18
19    def time(self):
20        with self._cond:
21            return self._time
22
23    # increase the time but not beyond the established limit
24    def sleep(self, t):
25        assert t >= 0
26        with self._cond:
27            t += self._time
28            while self._stop < t:
29                self._time = self._stop
30                self._cond.wait()
31            self._time = t
32
33    # advance time limit for user code
34    def advance(self, t):
35        assert t >= 0
36        with self._cond:
37            self._stop += t
38            self._cond.notify_all()
39
40
41class TestCase(unittest.TestCase):
42
43    def test_enter(self):
44        l = []
45        fun = lambda x: l.append(x)
46        scheduler = sched.scheduler(time.time, time.sleep)
47        for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
48            z = scheduler.enter(x, 1, fun, (x,))
49        scheduler.run()
50        self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
51
52    def test_enterabs(self):
53        l = []
54        fun = lambda x: l.append(x)
55        scheduler = sched.scheduler(time.time, time.sleep)
56        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
57            z = scheduler.enterabs(x, 1, fun, (x,))
58        scheduler.run()
59        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
60
61    @threading_helper.requires_working_threading()
62    def test_enter_concurrent(self):
63        q = queue.Queue()
64        fun = q.put
65        timer = Timer()
66        scheduler = sched.scheduler(timer.time, timer.sleep)
67        scheduler.enter(1, 1, fun, (1,))
68        scheduler.enter(3, 1, fun, (3,))
69        t = threading.Thread(target=scheduler.run)
70        t.start()
71        timer.advance(1)
72        self.assertEqual(q.get(timeout=TIMEOUT), 1)
73        self.assertTrue(q.empty())
74        for x in [4, 5, 2]:
75            z = scheduler.enter(x - 1, 1, fun, (x,))
76        timer.advance(2)
77        self.assertEqual(q.get(timeout=TIMEOUT), 2)
78        self.assertEqual(q.get(timeout=TIMEOUT), 3)
79        self.assertTrue(q.empty())
80        timer.advance(1)
81        self.assertEqual(q.get(timeout=TIMEOUT), 4)
82        self.assertTrue(q.empty())
83        timer.advance(1)
84        self.assertEqual(q.get(timeout=TIMEOUT), 5)
85        self.assertTrue(q.empty())
86        timer.advance(1000)
87        threading_helper.join_thread(t)
88        self.assertTrue(q.empty())
89        self.assertEqual(timer.time(), 5)
90
91    def test_priority(self):
92        l = []
93        fun = lambda x: l.append(x)
94        scheduler = sched.scheduler(time.time, time.sleep)
95
96        cases = [
97            ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
98            ([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]),
99            ([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]),
100            ([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]),
101        ]
102        for priorities, expected in cases:
103            with self.subTest(priorities=priorities, expected=expected):
104                for priority in priorities:
105                    scheduler.enterabs(0.01, priority, fun, (priority,))
106                scheduler.run()
107                self.assertEqual(l, expected)
108
109                # Cleanup:
110                self.assertTrue(scheduler.empty())
111                l.clear()
112
113    def test_cancel(self):
114        l = []
115        fun = lambda x: l.append(x)
116        scheduler = sched.scheduler(time.time, time.sleep)
117        now = time.time()
118        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
119        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
120        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
121        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
122        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
123        scheduler.cancel(event1)
124        scheduler.cancel(event5)
125        scheduler.run()
126        self.assertEqual(l, [0.02, 0.03, 0.04])
127
128    @threading_helper.requires_working_threading()
129    def test_cancel_concurrent(self):
130        q = queue.Queue()
131        fun = q.put
132        timer = Timer()
133        scheduler = sched.scheduler(timer.time, timer.sleep)
134        now = timer.time()
135        event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
136        event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
137        event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
138        event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
139        event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
140        t = threading.Thread(target=scheduler.run)
141        t.start()
142        timer.advance(1)
143        self.assertEqual(q.get(timeout=TIMEOUT), 1)
144        self.assertTrue(q.empty())
145        scheduler.cancel(event2)
146        scheduler.cancel(event5)
147        timer.advance(1)
148        self.assertTrue(q.empty())
149        timer.advance(1)
150        self.assertEqual(q.get(timeout=TIMEOUT), 3)
151        self.assertTrue(q.empty())
152        timer.advance(1)
153        self.assertEqual(q.get(timeout=TIMEOUT), 4)
154        self.assertTrue(q.empty())
155        timer.advance(1000)
156        threading_helper.join_thread(t)
157        self.assertTrue(q.empty())
158        self.assertEqual(timer.time(), 4)
159
160    def test_cancel_correct_event(self):
161        # bpo-19270
162        events = []
163        scheduler = sched.scheduler()
164        scheduler.enterabs(1, 1, events.append, ("a",))
165        b = scheduler.enterabs(1, 1, events.append, ("b",))
166        scheduler.enterabs(1, 1, events.append, ("c",))
167        scheduler.cancel(b)
168        scheduler.run()
169        self.assertEqual(events, ["a", "c"])
170
171    def test_empty(self):
172        l = []
173        fun = lambda x: l.append(x)
174        scheduler = sched.scheduler(time.time, time.sleep)
175        self.assertTrue(scheduler.empty())
176        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
177            z = scheduler.enterabs(x, 1, fun, (x,))
178        self.assertFalse(scheduler.empty())
179        scheduler.run()
180        self.assertTrue(scheduler.empty())
181
182    def test_queue(self):
183        l = []
184        fun = lambda x: l.append(x)
185        scheduler = sched.scheduler(time.time, time.sleep)
186        now = time.time()
187        e5 = scheduler.enterabs(now + 0.05, 1, fun)
188        e1 = scheduler.enterabs(now + 0.01, 1, fun)
189        e2 = scheduler.enterabs(now + 0.02, 1, fun)
190        e4 = scheduler.enterabs(now + 0.04, 1, fun)
191        e3 = scheduler.enterabs(now + 0.03, 1, fun)
192        # queue property is supposed to return an order list of
193        # upcoming events
194        self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
195
196    def test_args_kwargs(self):
197        seq = []
198        def fun(*a, **b):
199            seq.append((a, b))
200
201        now = time.time()
202        scheduler = sched.scheduler(time.time, time.sleep)
203        scheduler.enterabs(now, 1, fun)
204        scheduler.enterabs(now, 1, fun, argument=(1, 2))
205        scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
206        scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
207        scheduler.run()
208        self.assertCountEqual(seq, [
209            ((), {}),
210            ((1, 2), {}),
211            (('a', 'b'), {}),
212            ((1, 2), {'foo': 3})
213        ])
214
215    def test_run_non_blocking(self):
216        l = []
217        fun = lambda x: l.append(x)
218        scheduler = sched.scheduler(time.time, time.sleep)
219        for x in [10, 9, 8, 7, 6]:
220            scheduler.enter(x, 1, fun, (x,))
221        scheduler.run(blocking=False)
222        self.assertEqual(l, [])
223
224
225if __name__ == "__main__":
226    unittest.main()
227