1"""Tests for queues.py""" 2 3import asyncio 4import unittest 5from types import GenericAlias 6 7 8def tearDownModule(): 9 asyncio.set_event_loop_policy(None) 10 11 12class QueueBasicTests(unittest.IsolatedAsyncioTestCase): 13 14 async def _test_repr_or_str(self, fn, expect_id): 15 """Test Queue's repr or str. 16 17 fn is repr or str. expect_id is True if we expect the Queue's id to 18 appear in fn(Queue()). 19 """ 20 q = asyncio.Queue() 21 self.assertTrue(fn(q).startswith('<Queue'), fn(q)) 22 id_is_present = hex(id(q)) in fn(q) 23 self.assertEqual(expect_id, id_is_present) 24 25 # getters 26 q = asyncio.Queue() 27 async with asyncio.TaskGroup() as tg: 28 # Start a task that waits to get. 29 getter = tg.create_task(q.get()) 30 # Let it start waiting. 31 await asyncio.sleep(0) 32 self.assertTrue('_getters[1]' in fn(q)) 33 # resume q.get coroutine to finish generator 34 q.put_nowait(0) 35 36 self.assertEqual(0, await getter) 37 38 # putters 39 q = asyncio.Queue(maxsize=1) 40 async with asyncio.TaskGroup() as tg: 41 q.put_nowait(1) 42 # Start a task that waits to put. 43 putter = tg.create_task(q.put(2)) 44 # Let it start waiting. 45 await asyncio.sleep(0) 46 self.assertTrue('_putters[1]' in fn(q)) 47 # resume q.put coroutine to finish generator 48 q.get_nowait() 49 50 self.assertTrue(putter.done()) 51 52 q = asyncio.Queue() 53 q.put_nowait(1) 54 self.assertTrue('_queue=[1]' in fn(q)) 55 56 async def test_repr(self): 57 await self._test_repr_or_str(repr, True) 58 59 async def test_str(self): 60 await self._test_repr_or_str(str, False) 61 62 def test_generic_alias(self): 63 q = asyncio.Queue[int] 64 self.assertEqual(q.__args__, (int,)) 65 self.assertIsInstance(q, GenericAlias) 66 67 async def test_empty(self): 68 q = asyncio.Queue() 69 self.assertTrue(q.empty()) 70 await q.put(1) 71 self.assertFalse(q.empty()) 72 self.assertEqual(1, await q.get()) 73 self.assertTrue(q.empty()) 74 75 async def test_full(self): 76 q = asyncio.Queue() 77 self.assertFalse(q.full()) 78 79 q = asyncio.Queue(maxsize=1) 80 await q.put(1) 81 self.assertTrue(q.full()) 82 83 async def test_order(self): 84 q = asyncio.Queue() 85 for i in [1, 3, 2]: 86 await q.put(i) 87 88 items = [await q.get() for _ in range(3)] 89 self.assertEqual([1, 3, 2], items) 90 91 async def test_maxsize(self): 92 q = asyncio.Queue(maxsize=2) 93 self.assertEqual(2, q.maxsize) 94 have_been_put = [] 95 96 async def putter(): 97 for i in range(3): 98 await q.put(i) 99 have_been_put.append(i) 100 return True 101 102 t = asyncio.create_task(putter()) 103 for i in range(2): 104 await asyncio.sleep(0) 105 106 # The putter is blocked after putting two items. 107 self.assertEqual([0, 1], have_been_put) 108 self.assertEqual(0, await q.get()) 109 110 # Let the putter resume and put last item. 111 await asyncio.sleep(0) 112 self.assertEqual([0, 1, 2], have_been_put) 113 self.assertEqual(1, await q.get()) 114 self.assertEqual(2, await q.get()) 115 116 self.assertTrue(t.done()) 117 self.assertTrue(t.result()) 118 119 120class QueueGetTests(unittest.IsolatedAsyncioTestCase): 121 122 async def test_blocking_get(self): 123 q = asyncio.Queue() 124 q.put_nowait(1) 125 126 self.assertEqual(1, await q.get()) 127 128 async def test_get_with_putters(self): 129 loop = asyncio.get_running_loop() 130 131 q = asyncio.Queue(1) 132 await q.put(1) 133 134 waiter = loop.create_future() 135 q._putters.append(waiter) 136 137 self.assertEqual(1, await q.get()) 138 self.assertTrue(waiter.done()) 139 self.assertIsNone(waiter.result()) 140 141 async def test_blocking_get_wait(self): 142 loop = asyncio.get_running_loop() 143 q = asyncio.Queue() 144 started = asyncio.Event() 145 finished = False 146 147 async def queue_get(): 148 nonlocal finished 149 started.set() 150 res = await q.get() 151 finished = True 152 return res 153 154 queue_get_task = asyncio.create_task(queue_get()) 155 await started.wait() 156 self.assertFalse(finished) 157 loop.call_later(0.01, q.put_nowait, 1) 158 res = await queue_get_task 159 self.assertTrue(finished) 160 self.assertEqual(1, res) 161 162 def test_nonblocking_get(self): 163 q = asyncio.Queue() 164 q.put_nowait(1) 165 self.assertEqual(1, q.get_nowait()) 166 167 def test_nonblocking_get_exception(self): 168 q = asyncio.Queue() 169 self.assertRaises(asyncio.QueueEmpty, q.get_nowait) 170 171 async def test_get_cancelled_race(self): 172 q = asyncio.Queue() 173 174 t1 = asyncio.create_task(q.get()) 175 t2 = asyncio.create_task(q.get()) 176 177 await asyncio.sleep(0) 178 t1.cancel() 179 await asyncio.sleep(0) 180 self.assertTrue(t1.done()) 181 await q.put('a') 182 await asyncio.sleep(0) 183 self.assertEqual('a', await t2) 184 185 async def test_get_with_waiting_putters(self): 186 q = asyncio.Queue(maxsize=1) 187 asyncio.create_task(q.put('a')) 188 asyncio.create_task(q.put('b')) 189 self.assertEqual(await q.get(), 'a') 190 self.assertEqual(await q.get(), 'b') 191 192 async def test_why_are_getters_waiting(self): 193 async def consumer(queue, num_expected): 194 for _ in range(num_expected): 195 await queue.get() 196 197 async def producer(queue, num_items): 198 for i in range(num_items): 199 await queue.put(i) 200 201 producer_num_items = 5 202 203 q = asyncio.Queue(1) 204 async with asyncio.TaskGroup() as tg: 205 tg.create_task(producer(q, producer_num_items)) 206 tg.create_task(consumer(q, producer_num_items)) 207 208 async def test_cancelled_getters_not_being_held_in_self_getters(self): 209 queue = asyncio.Queue(maxsize=5) 210 211 with self.assertRaises(TimeoutError): 212 await asyncio.wait_for(queue.get(), 0.1) 213 214 self.assertEqual(len(queue._getters), 0) 215 216 217class QueuePutTests(unittest.IsolatedAsyncioTestCase): 218 219 async def test_blocking_put(self): 220 q = asyncio.Queue() 221 222 # No maxsize, won't block. 223 await q.put(1) 224 self.assertEqual(1, await q.get()) 225 226 async def test_blocking_put_wait(self): 227 q = asyncio.Queue(maxsize=1) 228 started = asyncio.Event() 229 finished = False 230 231 async def queue_put(): 232 nonlocal finished 233 started.set() 234 await q.put(1) 235 await q.put(2) 236 finished = True 237 238 loop = asyncio.get_running_loop() 239 loop.call_later(0.01, q.get_nowait) 240 queue_put_task = asyncio.create_task(queue_put()) 241 await started.wait() 242 self.assertFalse(finished) 243 await queue_put_task 244 self.assertTrue(finished) 245 246 def test_nonblocking_put(self): 247 q = asyncio.Queue() 248 q.put_nowait(1) 249 self.assertEqual(1, q.get_nowait()) 250 251 async def test_get_cancel_drop_one_pending_reader(self): 252 q = asyncio.Queue() 253 254 reader = asyncio.create_task(q.get()) 255 256 await asyncio.sleep(0) 257 258 q.put_nowait(1) 259 q.put_nowait(2) 260 reader.cancel() 261 262 try: 263 await reader 264 except asyncio.CancelledError: 265 # try again 266 reader = asyncio.create_task(q.get()) 267 await reader 268 269 result = reader.result() 270 # if we get 2, it means 1 got dropped! 271 self.assertEqual(1, result) 272 273 async def test_get_cancel_drop_many_pending_readers(self): 274 q = asyncio.Queue() 275 276 async with asyncio.TaskGroup() as tg: 277 reader1 = tg.create_task(q.get()) 278 reader2 = tg.create_task(q.get()) 279 reader3 = tg.create_task(q.get()) 280 281 await asyncio.sleep(0) 282 283 q.put_nowait(1) 284 q.put_nowait(2) 285 reader1.cancel() 286 287 with self.assertRaises(asyncio.CancelledError): 288 await reader1 289 290 await reader3 291 292 # It is undefined in which order concurrent readers receive results. 293 self.assertEqual({reader2.result(), reader3.result()}, {1, 2}) 294 295 async def test_put_cancel_drop(self): 296 q = asyncio.Queue(1) 297 298 q.put_nowait(1) 299 300 # putting a second item in the queue has to block (qsize=1) 301 writer = asyncio.create_task(q.put(2)) 302 await asyncio.sleep(0) 303 304 value1 = q.get_nowait() 305 self.assertEqual(value1, 1) 306 307 writer.cancel() 308 try: 309 await writer 310 except asyncio.CancelledError: 311 # try again 312 writer = asyncio.create_task(q.put(2)) 313 await writer 314 315 value2 = q.get_nowait() 316 self.assertEqual(value2, 2) 317 self.assertEqual(q.qsize(), 0) 318 319 def test_nonblocking_put_exception(self): 320 q = asyncio.Queue(maxsize=1, ) 321 q.put_nowait(1) 322 self.assertRaises(asyncio.QueueFull, q.put_nowait, 2) 323 324 async def test_float_maxsize(self): 325 q = asyncio.Queue(maxsize=1.3, ) 326 q.put_nowait(1) 327 q.put_nowait(2) 328 self.assertTrue(q.full()) 329 self.assertRaises(asyncio.QueueFull, q.put_nowait, 3) 330 331 q = asyncio.Queue(maxsize=1.3, ) 332 333 await q.put(1) 334 await q.put(2) 335 self.assertTrue(q.full()) 336 337 async def test_put_cancelled(self): 338 q = asyncio.Queue() 339 340 async def queue_put(): 341 await q.put(1) 342 return True 343 344 t = asyncio.create_task(queue_put()) 345 346 self.assertEqual(1, await q.get()) 347 self.assertTrue(t.done()) 348 self.assertTrue(t.result()) 349 350 async def test_put_cancelled_race(self): 351 q = asyncio.Queue(maxsize=1) 352 353 put_a = asyncio.create_task(q.put('a')) 354 put_b = asyncio.create_task(q.put('b')) 355 put_c = asyncio.create_task(q.put('X')) 356 357 await asyncio.sleep(0) 358 self.assertTrue(put_a.done()) 359 self.assertFalse(put_b.done()) 360 361 put_c.cancel() 362 await asyncio.sleep(0) 363 self.assertTrue(put_c.done()) 364 self.assertEqual(q.get_nowait(), 'a') 365 await asyncio.sleep(0) 366 self.assertEqual(q.get_nowait(), 'b') 367 368 await put_b 369 370 async def test_put_with_waiting_getters(self): 371 q = asyncio.Queue() 372 t = asyncio.create_task(q.get()) 373 await asyncio.sleep(0) 374 await q.put('a') 375 self.assertEqual(await t, 'a') 376 377 async def test_why_are_putters_waiting(self): 378 queue = asyncio.Queue(2) 379 380 async def putter(item): 381 await queue.put(item) 382 383 async def getter(): 384 await asyncio.sleep(0) 385 num = queue.qsize() 386 for _ in range(num): 387 queue.get_nowait() 388 389 async with asyncio.TaskGroup() as tg: 390 tg.create_task(getter()) 391 tg.create_task(putter(0)) 392 tg.create_task(putter(1)) 393 tg.create_task(putter(2)) 394 tg.create_task(putter(3)) 395 396 async def test_cancelled_puts_not_being_held_in_self_putters(self): 397 # Full queue. 398 queue = asyncio.Queue(maxsize=1) 399 queue.put_nowait(1) 400 401 # Task waiting for space to put an item in the queue. 402 put_task = asyncio.create_task(queue.put(1)) 403 await asyncio.sleep(0) 404 405 # Check that the putter is correctly removed from queue._putters when 406 # the task is canceled. 407 self.assertEqual(len(queue._putters), 1) 408 put_task.cancel() 409 with self.assertRaises(asyncio.CancelledError): 410 await put_task 411 self.assertEqual(len(queue._putters), 0) 412 413 async def test_cancelled_put_silence_value_error_exception(self): 414 # Full Queue. 415 queue = asyncio.Queue(1) 416 queue.put_nowait(1) 417 418 # Task waiting for space to put a item in the queue. 419 put_task = asyncio.create_task(queue.put(1)) 420 await asyncio.sleep(0) 421 422 # get_nowait() remove the future of put_task from queue._putters. 423 queue.get_nowait() 424 # When canceled, queue.put is going to remove its future from 425 # self._putters but it was removed previously by queue.get_nowait(). 426 put_task.cancel() 427 428 # The ValueError exception triggered by queue._putters.remove(putter) 429 # inside queue.put should be silenced. 430 # If the ValueError is silenced we should catch a CancelledError. 431 with self.assertRaises(asyncio.CancelledError): 432 await put_task 433 434 435class LifoQueueTests(unittest.IsolatedAsyncioTestCase): 436 437 async def test_order(self): 438 q = asyncio.LifoQueue() 439 for i in [1, 3, 2]: 440 await q.put(i) 441 442 items = [await q.get() for _ in range(3)] 443 self.assertEqual([2, 3, 1], items) 444 445 446class PriorityQueueTests(unittest.IsolatedAsyncioTestCase): 447 448 async def test_order(self): 449 q = asyncio.PriorityQueue() 450 for i in [1, 3, 2]: 451 await q.put(i) 452 453 items = [await q.get() for _ in range(3)] 454 self.assertEqual([1, 2, 3], items) 455 456 457class _QueueJoinTestMixin: 458 459 q_class = None 460 461 def test_task_done_underflow(self): 462 q = self.q_class() 463 self.assertRaises(ValueError, q.task_done) 464 465 async def test_task_done(self): 466 q = self.q_class() 467 for i in range(100): 468 q.put_nowait(i) 469 470 accumulator = 0 471 472 # Two workers get items from the queue and call task_done after each. 473 # Join the queue and assert all items have been processed. 474 running = True 475 476 async def worker(): 477 nonlocal accumulator 478 479 while running: 480 item = await q.get() 481 accumulator += item 482 q.task_done() 483 484 async with asyncio.TaskGroup() as tg: 485 tasks = [tg.create_task(worker()) 486 for index in range(2)] 487 488 await q.join() 489 self.assertEqual(sum(range(100)), accumulator) 490 491 # close running generators 492 running = False 493 for i in range(len(tasks)): 494 q.put_nowait(0) 495 496 async def test_join_empty_queue(self): 497 q = self.q_class() 498 499 # Test that a queue join()s successfully, and before anything else 500 # (done twice for insurance). 501 502 await q.join() 503 await q.join() 504 505 async def test_format(self): 506 q = self.q_class() 507 self.assertEqual(q._format(), 'maxsize=0') 508 509 q._unfinished_tasks = 2 510 self.assertEqual(q._format(), 'maxsize=0 tasks=2') 511 512 513class QueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCase): 514 q_class = asyncio.Queue 515 516 517class LifoQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCase): 518 q_class = asyncio.LifoQueue 519 520 521class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCase): 522 q_class = asyncio.PriorityQueue 523 524 525if __name__ == '__main__': 526 unittest.main() 527