1# Some simple queue module tests, plus some failure conditions 2# to ensure the Queue locks remain stable. 3import itertools 4import random 5import threading 6import time 7import unittest 8import weakref 9from test.support import gc_collect 10from test.support import import_helper 11from test.support import threading_helper 12 13# queue module depends on threading primitives 14threading_helper.requires_working_threading(module=True) 15 16py_queue = import_helper.import_fresh_module('queue', blocked=['_queue']) 17c_queue = import_helper.import_fresh_module('queue', fresh=['_queue']) 18need_c_queue = unittest.skipUnless(c_queue, "No _queue module found") 19 20QUEUE_SIZE = 5 21 22def qfull(q): 23 return q.maxsize > 0 and q.qsize() == q.maxsize 24 25# A thread to run a function that unclogs a blocked Queue. 26class _TriggerThread(threading.Thread): 27 def __init__(self, fn, args): 28 self.fn = fn 29 self.args = args 30 self.startedEvent = threading.Event() 31 threading.Thread.__init__(self) 32 33 def run(self): 34 # The sleep isn't necessary, but is intended to give the blocking 35 # function in the main thread a chance at actually blocking before 36 # we unclog it. But if the sleep is longer than the timeout-based 37 # tests wait in their blocking functions, those tests will fail. 38 # So we give them much longer timeout values compared to the 39 # sleep here (I aimed at 10 seconds for blocking functions -- 40 # they should never actually wait that long - they should make 41 # progress as soon as we call self.fn()). 42 time.sleep(0.1) 43 self.startedEvent.set() 44 self.fn(*self.args) 45 46 47# Execute a function that blocks, and in a separate thread, a function that 48# triggers the release. Returns the result of the blocking function. Caution: 49# block_func must guarantee to block until trigger_func is called, and 50# trigger_func must guarantee to change queue state so that block_func can make 51# enough progress to return. In particular, a block_func that just raises an 52# exception regardless of whether trigger_func is called will lead to 53# timing-dependent sporadic failures, and one of those went rarely seen but 54# undiagnosed for years. Now block_func must be unexceptional. If block_func 55# is supposed to raise an exception, call do_exceptional_blocking_test() 56# instead. 57 58class BlockingTestMixin: 59 60 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): 61 thread = _TriggerThread(trigger_func, trigger_args) 62 thread.start() 63 try: 64 self.result = block_func(*block_args) 65 # If block_func returned before our thread made the call, we failed! 66 if not thread.startedEvent.is_set(): 67 self.fail("blocking function %r appeared not to block" % 68 block_func) 69 return self.result 70 finally: 71 threading_helper.join_thread(thread) # make sure the thread terminates 72 73 # Call this instead if block_func is supposed to raise an exception. 74 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, 75 trigger_args, expected_exception_class): 76 thread = _TriggerThread(trigger_func, trigger_args) 77 thread.start() 78 try: 79 try: 80 block_func(*block_args) 81 except expected_exception_class: 82 raise 83 else: 84 self.fail("expected exception of kind %r" % 85 expected_exception_class) 86 finally: 87 threading_helper.join_thread(thread) # make sure the thread terminates 88 if not thread.startedEvent.is_set(): 89 self.fail("trigger thread ended but event never set") 90 91 92class BaseQueueTestMixin(BlockingTestMixin): 93 def setUp(self): 94 self.cum = 0 95 self.cumlock = threading.Lock() 96 97 def basic_queue_test(self, q): 98 if q.qsize(): 99 raise RuntimeError("Call this function with an empty queue") 100 self.assertTrue(q.empty()) 101 self.assertFalse(q.full()) 102 # I guess we better check things actually queue correctly a little :) 103 q.put(111) 104 q.put(333) 105 q.put(222) 106 target_order = dict(Queue = [111, 333, 222], 107 LifoQueue = [222, 333, 111], 108 PriorityQueue = [111, 222, 333]) 109 actual_order = [q.get(), q.get(), q.get()] 110 self.assertEqual(actual_order, target_order[q.__class__.__name__], 111 "Didn't seem to queue the correct data!") 112 for i in range(QUEUE_SIZE-1): 113 q.put(i) 114 self.assertTrue(q.qsize(), "Queue should not be empty") 115 self.assertTrue(not qfull(q), "Queue should not be full") 116 last = 2 * QUEUE_SIZE 117 full = 3 * 2 * QUEUE_SIZE 118 q.put(last) 119 self.assertTrue(qfull(q), "Queue should be full") 120 self.assertFalse(q.empty()) 121 self.assertTrue(q.full()) 122 try: 123 q.put(full, block=0) 124 self.fail("Didn't appear to block with a full queue") 125 except self.queue.Full: 126 pass 127 try: 128 q.put(full, timeout=0.01) 129 self.fail("Didn't appear to time-out with a full queue") 130 except self.queue.Full: 131 pass 132 # Test a blocking put 133 self.do_blocking_test(q.put, (full,), q.get, ()) 134 self.do_blocking_test(q.put, (full, True, 10), q.get, ()) 135 # Empty it 136 for i in range(QUEUE_SIZE): 137 q.get() 138 self.assertTrue(not q.qsize(), "Queue should be empty") 139 try: 140 q.get(block=0) 141 self.fail("Didn't appear to block with an empty queue") 142 except self.queue.Empty: 143 pass 144 try: 145 q.get(timeout=0.01) 146 self.fail("Didn't appear to time-out with an empty queue") 147 except self.queue.Empty: 148 pass 149 # Test a blocking get 150 self.do_blocking_test(q.get, (), q.put, ('empty',)) 151 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) 152 153 154 def worker(self, q): 155 while True: 156 x = q.get() 157 if x < 0: 158 q.task_done() 159 return 160 with self.cumlock: 161 self.cum += x 162 q.task_done() 163 164 def queue_join_test(self, q): 165 self.cum = 0 166 threads = [] 167 for i in (0,1): 168 thread = threading.Thread(target=self.worker, args=(q,)) 169 thread.start() 170 threads.append(thread) 171 for i in range(100): 172 q.put(i) 173 q.join() 174 self.assertEqual(self.cum, sum(range(100)), 175 "q.join() did not block until all tasks were done") 176 for i in (0,1): 177 q.put(-1) # instruct the threads to close 178 q.join() # verify that you can join twice 179 for thread in threads: 180 thread.join() 181 182 def test_queue_task_done(self): 183 # Test to make sure a queue task completed successfully. 184 q = self.type2test() 185 try: 186 q.task_done() 187 except ValueError: 188 pass 189 else: 190 self.fail("Did not detect task count going negative") 191 192 def test_queue_join(self): 193 # Test that a queue join()s successfully, and before anything else 194 # (done twice for insurance). 195 q = self.type2test() 196 self.queue_join_test(q) 197 self.queue_join_test(q) 198 try: 199 q.task_done() 200 except ValueError: 201 pass 202 else: 203 self.fail("Did not detect task count going negative") 204 205 def test_basic(self): 206 # Do it a couple of times on the same queue. 207 # Done twice to make sure works with same instance reused. 208 q = self.type2test(QUEUE_SIZE) 209 self.basic_queue_test(q) 210 self.basic_queue_test(q) 211 212 def test_negative_timeout_raises_exception(self): 213 q = self.type2test(QUEUE_SIZE) 214 with self.assertRaises(ValueError): 215 q.put(1, timeout=-1) 216 with self.assertRaises(ValueError): 217 q.get(1, timeout=-1) 218 219 def test_nowait(self): 220 q = self.type2test(QUEUE_SIZE) 221 for i in range(QUEUE_SIZE): 222 q.put_nowait(1) 223 with self.assertRaises(self.queue.Full): 224 q.put_nowait(1) 225 226 for i in range(QUEUE_SIZE): 227 q.get_nowait() 228 with self.assertRaises(self.queue.Empty): 229 q.get_nowait() 230 231 def test_shrinking_queue(self): 232 # issue 10110 233 q = self.type2test(3) 234 q.put(1) 235 q.put(2) 236 q.put(3) 237 with self.assertRaises(self.queue.Full): 238 q.put_nowait(4) 239 self.assertEqual(q.qsize(), 3) 240 q.maxsize = 2 # shrink the queue 241 with self.assertRaises(self.queue.Full): 242 q.put_nowait(4) 243 244class QueueTest(BaseQueueTestMixin): 245 246 def setUp(self): 247 self.type2test = self.queue.Queue 248 super().setUp() 249 250class PyQueueTest(QueueTest, unittest.TestCase): 251 queue = py_queue 252 253 254@need_c_queue 255class CQueueTest(QueueTest, unittest.TestCase): 256 queue = c_queue 257 258 259class LifoQueueTest(BaseQueueTestMixin): 260 261 def setUp(self): 262 self.type2test = self.queue.LifoQueue 263 super().setUp() 264 265 266class PyLifoQueueTest(LifoQueueTest, unittest.TestCase): 267 queue = py_queue 268 269 270@need_c_queue 271class CLifoQueueTest(LifoQueueTest, unittest.TestCase): 272 queue = c_queue 273 274 275class PriorityQueueTest(BaseQueueTestMixin): 276 277 def setUp(self): 278 self.type2test = self.queue.PriorityQueue 279 super().setUp() 280 281 282class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase): 283 queue = py_queue 284 285 286@need_c_queue 287class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase): 288 queue = c_queue 289 290 291# A Queue subclass that can provoke failure at a moment's notice :) 292class FailingQueueException(Exception): pass 293 294 295class FailingQueueTest(BlockingTestMixin): 296 297 def setUp(self): 298 299 Queue = self.queue.Queue 300 301 class FailingQueue(Queue): 302 def __init__(self, *args): 303 self.fail_next_put = False 304 self.fail_next_get = False 305 Queue.__init__(self, *args) 306 def _put(self, item): 307 if self.fail_next_put: 308 self.fail_next_put = False 309 raise FailingQueueException("You Lose") 310 return Queue._put(self, item) 311 def _get(self): 312 if self.fail_next_get: 313 self.fail_next_get = False 314 raise FailingQueueException("You Lose") 315 return Queue._get(self) 316 317 self.FailingQueue = FailingQueue 318 319 super().setUp() 320 321 def failing_queue_test(self, q): 322 if q.qsize(): 323 raise RuntimeError("Call this function with an empty queue") 324 for i in range(QUEUE_SIZE-1): 325 q.put(i) 326 # Test a failing non-blocking put. 327 q.fail_next_put = True 328 try: 329 q.put("oops", block=0) 330 self.fail("The queue didn't fail when it should have") 331 except FailingQueueException: 332 pass 333 q.fail_next_put = True 334 try: 335 q.put("oops", timeout=0.1) 336 self.fail("The queue didn't fail when it should have") 337 except FailingQueueException: 338 pass 339 q.put("last") 340 self.assertTrue(qfull(q), "Queue should be full") 341 # Test a failing blocking put 342 q.fail_next_put = True 343 try: 344 self.do_blocking_test(q.put, ("full",), q.get, ()) 345 self.fail("The queue didn't fail when it should have") 346 except FailingQueueException: 347 pass 348 # Check the Queue isn't damaged. 349 # put failed, but get succeeded - re-add 350 q.put("last") 351 # Test a failing timeout put 352 q.fail_next_put = True 353 try: 354 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), 355 FailingQueueException) 356 self.fail("The queue didn't fail when it should have") 357 except FailingQueueException: 358 pass 359 # Check the Queue isn't damaged. 360 # put failed, but get succeeded - re-add 361 q.put("last") 362 self.assertTrue(qfull(q), "Queue should be full") 363 q.get() 364 self.assertTrue(not qfull(q), "Queue should not be full") 365 q.put("last") 366 self.assertTrue(qfull(q), "Queue should be full") 367 # Test a blocking put 368 self.do_blocking_test(q.put, ("full",), q.get, ()) 369 # Empty it 370 for i in range(QUEUE_SIZE): 371 q.get() 372 self.assertTrue(not q.qsize(), "Queue should be empty") 373 q.put("first") 374 q.fail_next_get = True 375 try: 376 q.get() 377 self.fail("The queue didn't fail when it should have") 378 except FailingQueueException: 379 pass 380 self.assertTrue(q.qsize(), "Queue should not be empty") 381 q.fail_next_get = True 382 try: 383 q.get(timeout=0.1) 384 self.fail("The queue didn't fail when it should have") 385 except FailingQueueException: 386 pass 387 self.assertTrue(q.qsize(), "Queue should not be empty") 388 q.get() 389 self.assertTrue(not q.qsize(), "Queue should be empty") 390 q.fail_next_get = True 391 try: 392 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), 393 FailingQueueException) 394 self.fail("The queue didn't fail when it should have") 395 except FailingQueueException: 396 pass 397 # put succeeded, but get failed. 398 self.assertTrue(q.qsize(), "Queue should not be empty") 399 q.get() 400 self.assertTrue(not q.qsize(), "Queue should be empty") 401 402 def test_failing_queue(self): 403 404 # Test to make sure a queue is functioning correctly. 405 # Done twice to the same instance. 406 q = self.FailingQueue(QUEUE_SIZE) 407 self.failing_queue_test(q) 408 self.failing_queue_test(q) 409 410 411 412class PyFailingQueueTest(FailingQueueTest, unittest.TestCase): 413 queue = py_queue 414 415 416@need_c_queue 417class CFailingQueueTest(FailingQueueTest, unittest.TestCase): 418 queue = c_queue 419 420 421class BaseSimpleQueueTest: 422 423 def setUp(self): 424 self.q = self.type2test() 425 426 def feed(self, q, seq, rnd, sentinel): 427 while True: 428 try: 429 val = seq.pop() 430 except IndexError: 431 q.put(sentinel) 432 return 433 q.put(val) 434 if rnd.random() > 0.5: 435 time.sleep(rnd.random() * 1e-3) 436 437 def consume(self, q, results, sentinel): 438 while True: 439 val = q.get() 440 if val == sentinel: 441 return 442 results.append(val) 443 444 def consume_nonblock(self, q, results, sentinel): 445 while True: 446 while True: 447 try: 448 val = q.get(block=False) 449 except self.queue.Empty: 450 time.sleep(1e-5) 451 else: 452 break 453 if val == sentinel: 454 return 455 results.append(val) 456 457 def consume_timeout(self, q, results, sentinel): 458 while True: 459 while True: 460 try: 461 val = q.get(timeout=1e-5) 462 except self.queue.Empty: 463 pass 464 else: 465 break 466 if val == sentinel: 467 return 468 results.append(val) 469 470 def run_threads(self, n_threads, q, inputs, feed_func, consume_func): 471 results = [] 472 sentinel = None 473 seq = inputs.copy() 474 seq.reverse() 475 rnd = random.Random(42) 476 477 exceptions = [] 478 def log_exceptions(f): 479 def wrapper(*args, **kwargs): 480 try: 481 f(*args, **kwargs) 482 except BaseException as e: 483 exceptions.append(e) 484 return wrapper 485 486 feeders = [threading.Thread(target=log_exceptions(feed_func), 487 args=(q, seq, rnd, sentinel)) 488 for i in range(n_threads)] 489 consumers = [threading.Thread(target=log_exceptions(consume_func), 490 args=(q, results, sentinel)) 491 for i in range(n_threads)] 492 493 with threading_helper.start_threads(feeders + consumers): 494 pass 495 496 self.assertFalse(exceptions) 497 self.assertTrue(q.empty()) 498 self.assertEqual(q.qsize(), 0) 499 500 return results 501 502 def test_basic(self): 503 # Basic tests for get(), put() etc. 504 q = self.q 505 self.assertTrue(q.empty()) 506 self.assertEqual(q.qsize(), 0) 507 q.put(1) 508 self.assertFalse(q.empty()) 509 self.assertEqual(q.qsize(), 1) 510 q.put(2) 511 q.put_nowait(3) 512 q.put(4) 513 self.assertFalse(q.empty()) 514 self.assertEqual(q.qsize(), 4) 515 516 self.assertEqual(q.get(), 1) 517 self.assertEqual(q.qsize(), 3) 518 519 self.assertEqual(q.get_nowait(), 2) 520 self.assertEqual(q.qsize(), 2) 521 522 self.assertEqual(q.get(block=False), 3) 523 self.assertFalse(q.empty()) 524 self.assertEqual(q.qsize(), 1) 525 526 self.assertEqual(q.get(timeout=0.1), 4) 527 self.assertTrue(q.empty()) 528 self.assertEqual(q.qsize(), 0) 529 530 with self.assertRaises(self.queue.Empty): 531 q.get(block=False) 532 with self.assertRaises(self.queue.Empty): 533 q.get(timeout=1e-3) 534 with self.assertRaises(self.queue.Empty): 535 q.get_nowait() 536 self.assertTrue(q.empty()) 537 self.assertEqual(q.qsize(), 0) 538 539 def test_negative_timeout_raises_exception(self): 540 q = self.q 541 q.put(1) 542 with self.assertRaises(ValueError): 543 q.get(timeout=-1) 544 545 def test_order(self): 546 # Test a pair of concurrent put() and get() 547 q = self.q 548 inputs = list(range(100)) 549 results = self.run_threads(1, q, inputs, self.feed, self.consume) 550 551 # One producer, one consumer => results appended in well-defined order 552 self.assertEqual(results, inputs) 553 554 def test_many_threads(self): 555 # Test multiple concurrent put() and get() 556 N = 50 557 q = self.q 558 inputs = list(range(10000)) 559 results = self.run_threads(N, q, inputs, self.feed, self.consume) 560 561 # Multiple consumers without synchronization append the 562 # results in random order 563 self.assertEqual(sorted(results), inputs) 564 565 def test_many_threads_nonblock(self): 566 # Test multiple concurrent put() and get(block=False) 567 N = 50 568 q = self.q 569 inputs = list(range(10000)) 570 results = self.run_threads(N, q, inputs, 571 self.feed, self.consume_nonblock) 572 573 self.assertEqual(sorted(results), inputs) 574 575 def test_many_threads_timeout(self): 576 # Test multiple concurrent put() and get(timeout=...) 577 N = 50 578 q = self.q 579 inputs = list(range(1000)) 580 results = self.run_threads(N, q, inputs, 581 self.feed, self.consume_timeout) 582 583 self.assertEqual(sorted(results), inputs) 584 585 def test_references(self): 586 # The queue should lose references to each item as soon as 587 # it leaves the queue. 588 class C: 589 pass 590 591 N = 20 592 q = self.q 593 for i in range(N): 594 q.put(C()) 595 for i in range(N): 596 wr = weakref.ref(q.get()) 597 gc_collect() # For PyPy or other GCs. 598 self.assertIsNone(wr()) 599 600 601class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): 602 603 queue = py_queue 604 def setUp(self): 605 self.type2test = self.queue._PySimpleQueue 606 super().setUp() 607 608 609@need_c_queue 610class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): 611 612 queue = c_queue 613 614 def setUp(self): 615 self.type2test = self.queue.SimpleQueue 616 super().setUp() 617 618 def test_is_default(self): 619 self.assertIs(self.type2test, self.queue.SimpleQueue) 620 self.assertIs(self.type2test, self.queue.SimpleQueue) 621 622 def test_reentrancy(self): 623 # bpo-14976: put() may be called reentrantly in an asynchronous 624 # callback. 625 q = self.q 626 gen = itertools.count() 627 N = 10000 628 results = [] 629 630 # This test exploits the fact that __del__ in a reference cycle 631 # can be called any time the GC may run. 632 633 class Circular(object): 634 def __init__(self): 635 self.circular = self 636 637 def __del__(self): 638 q.put(next(gen)) 639 640 while True: 641 o = Circular() 642 q.put(next(gen)) 643 del o 644 results.append(q.get()) 645 if results[-1] >= N: 646 break 647 648 self.assertEqual(results, list(range(N + 1))) 649 650 651if __name__ == "__main__": 652 unittest.main() 653