1import queue 2import sched 3import threading 4import time 5import unittest 6from test import support 7from test.support import threading_helper 8 9 10TIMEOUT = support.SHORT_TIMEOUT 11 12 13class Timer: 14 def __init__(self): 15 self._cond = threading.Condition() 16 self._time = 0 17 self._stop = 0 18 19 def time(self): 20 with self._cond: 21 return self._time 22 23 # increase the time but not beyond the established limit 24 def sleep(self, t): 25 assert t >= 0 26 with self._cond: 27 t += self._time 28 while self._stop < t: 29 self._time = self._stop 30 self._cond.wait() 31 self._time = t 32 33 # advance time limit for user code 34 def advance(self, t): 35 assert t >= 0 36 with self._cond: 37 self._stop += t 38 self._cond.notify_all() 39 40 41class TestCase(unittest.TestCase): 42 43 def test_enter(self): 44 l = [] 45 fun = lambda x: l.append(x) 46 scheduler = sched.scheduler(time.time, time.sleep) 47 for x in [0.5, 0.4, 0.3, 0.2, 0.1]: 48 z = scheduler.enter(x, 1, fun, (x,)) 49 scheduler.run() 50 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5]) 51 52 def test_enterabs(self): 53 l = [] 54 fun = lambda x: l.append(x) 55 scheduler = sched.scheduler(time.time, time.sleep) 56 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 57 z = scheduler.enterabs(x, 1, fun, (x,)) 58 scheduler.run() 59 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) 60 61 @threading_helper.requires_working_threading() 62 def test_enter_concurrent(self): 63 q = queue.Queue() 64 fun = q.put 65 timer = Timer() 66 scheduler = sched.scheduler(timer.time, timer.sleep) 67 scheduler.enter(1, 1, fun, (1,)) 68 scheduler.enter(3, 1, fun, (3,)) 69 t = threading.Thread(target=scheduler.run) 70 t.start() 71 timer.advance(1) 72 self.assertEqual(q.get(timeout=TIMEOUT), 1) 73 self.assertTrue(q.empty()) 74 for x in [4, 5, 2]: 75 z = scheduler.enter(x - 1, 1, fun, (x,)) 76 timer.advance(2) 77 self.assertEqual(q.get(timeout=TIMEOUT), 2) 78 self.assertEqual(q.get(timeout=TIMEOUT), 3) 79 self.assertTrue(q.empty()) 80 timer.advance(1) 81 self.assertEqual(q.get(timeout=TIMEOUT), 4) 82 self.assertTrue(q.empty()) 83 timer.advance(1) 84 self.assertEqual(q.get(timeout=TIMEOUT), 5) 85 self.assertTrue(q.empty()) 86 timer.advance(1000) 87 threading_helper.join_thread(t) 88 self.assertTrue(q.empty()) 89 self.assertEqual(timer.time(), 5) 90 91 def test_priority(self): 92 l = [] 93 fun = lambda x: l.append(x) 94 scheduler = sched.scheduler(time.time, time.sleep) 95 96 cases = [ 97 ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]), 98 ([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]), 99 ([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]), 100 ([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]), 101 ] 102 for priorities, expected in cases: 103 with self.subTest(priorities=priorities, expected=expected): 104 for priority in priorities: 105 scheduler.enterabs(0.01, priority, fun, (priority,)) 106 scheduler.run() 107 self.assertEqual(l, expected) 108 109 # Cleanup: 110 self.assertTrue(scheduler.empty()) 111 l.clear() 112 113 def test_cancel(self): 114 l = [] 115 fun = lambda x: l.append(x) 116 scheduler = sched.scheduler(time.time, time.sleep) 117 now = time.time() 118 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,)) 119 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,)) 120 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,)) 121 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,)) 122 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,)) 123 scheduler.cancel(event1) 124 scheduler.cancel(event5) 125 scheduler.run() 126 self.assertEqual(l, [0.02, 0.03, 0.04]) 127 128 @threading_helper.requires_working_threading() 129 def test_cancel_concurrent(self): 130 q = queue.Queue() 131 fun = q.put 132 timer = Timer() 133 scheduler = sched.scheduler(timer.time, timer.sleep) 134 now = timer.time() 135 event1 = scheduler.enterabs(now + 1, 1, fun, (1,)) 136 event2 = scheduler.enterabs(now + 2, 1, fun, (2,)) 137 event4 = scheduler.enterabs(now + 4, 1, fun, (4,)) 138 event5 = scheduler.enterabs(now + 5, 1, fun, (5,)) 139 event3 = scheduler.enterabs(now + 3, 1, fun, (3,)) 140 t = threading.Thread(target=scheduler.run) 141 t.start() 142 timer.advance(1) 143 self.assertEqual(q.get(timeout=TIMEOUT), 1) 144 self.assertTrue(q.empty()) 145 scheduler.cancel(event2) 146 scheduler.cancel(event5) 147 timer.advance(1) 148 self.assertTrue(q.empty()) 149 timer.advance(1) 150 self.assertEqual(q.get(timeout=TIMEOUT), 3) 151 self.assertTrue(q.empty()) 152 timer.advance(1) 153 self.assertEqual(q.get(timeout=TIMEOUT), 4) 154 self.assertTrue(q.empty()) 155 timer.advance(1000) 156 threading_helper.join_thread(t) 157 self.assertTrue(q.empty()) 158 self.assertEqual(timer.time(), 4) 159 160 def test_cancel_correct_event(self): 161 # bpo-19270 162 events = [] 163 scheduler = sched.scheduler() 164 scheduler.enterabs(1, 1, events.append, ("a",)) 165 b = scheduler.enterabs(1, 1, events.append, ("b",)) 166 scheduler.enterabs(1, 1, events.append, ("c",)) 167 scheduler.cancel(b) 168 scheduler.run() 169 self.assertEqual(events, ["a", "c"]) 170 171 def test_empty(self): 172 l = [] 173 fun = lambda x: l.append(x) 174 scheduler = sched.scheduler(time.time, time.sleep) 175 self.assertTrue(scheduler.empty()) 176 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 177 z = scheduler.enterabs(x, 1, fun, (x,)) 178 self.assertFalse(scheduler.empty()) 179 scheduler.run() 180 self.assertTrue(scheduler.empty()) 181 182 def test_queue(self): 183 l = [] 184 fun = lambda x: l.append(x) 185 scheduler = sched.scheduler(time.time, time.sleep) 186 now = time.time() 187 e5 = scheduler.enterabs(now + 0.05, 1, fun) 188 e1 = scheduler.enterabs(now + 0.01, 1, fun) 189 e2 = scheduler.enterabs(now + 0.02, 1, fun) 190 e4 = scheduler.enterabs(now + 0.04, 1, fun) 191 e3 = scheduler.enterabs(now + 0.03, 1, fun) 192 # queue property is supposed to return an order list of 193 # upcoming events 194 self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5]) 195 196 def test_args_kwargs(self): 197 seq = [] 198 def fun(*a, **b): 199 seq.append((a, b)) 200 201 now = time.time() 202 scheduler = sched.scheduler(time.time, time.sleep) 203 scheduler.enterabs(now, 1, fun) 204 scheduler.enterabs(now, 1, fun, argument=(1, 2)) 205 scheduler.enterabs(now, 1, fun, argument=('a', 'b')) 206 scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3}) 207 scheduler.run() 208 self.assertCountEqual(seq, [ 209 ((), {}), 210 ((1, 2), {}), 211 (('a', 'b'), {}), 212 ((1, 2), {'foo': 3}) 213 ]) 214 215 def test_run_non_blocking(self): 216 l = [] 217 fun = lambda x: l.append(x) 218 scheduler = sched.scheduler(time.time, time.sleep) 219 for x in [10, 9, 8, 7, 6]: 220 scheduler.enter(x, 1, fun, (x,)) 221 scheduler.run(blocking=False) 222 self.assertEqual(l, []) 223 224 225if __name__ == "__main__": 226 unittest.main() 227