1import gc 2import pprint 3import sys 4import unittest 5from test import support 6 7 8class TestGetProfile(unittest.TestCase): 9 def setUp(self): 10 sys.setprofile(None) 11 12 def tearDown(self): 13 sys.setprofile(None) 14 15 def test_empty(self): 16 self.assertIsNone(sys.getprofile()) 17 18 def test_setget(self): 19 def fn(*args): 20 pass 21 22 sys.setprofile(fn) 23 self.assertIs(sys.getprofile(), fn) 24 25class HookWatcher: 26 def __init__(self): 27 self.frames = [] 28 self.events = [] 29 30 def callback(self, frame, event, arg): 31 if (event == "call" 32 or event == "return" 33 or event == "exception"): 34 self.add_event(event, frame) 35 36 def add_event(self, event, frame=None): 37 """Add an event to the log.""" 38 if frame is None: 39 frame = sys._getframe(1) 40 41 try: 42 frameno = self.frames.index(frame) 43 except ValueError: 44 frameno = len(self.frames) 45 self.frames.append(frame) 46 47 self.events.append((frameno, event, ident(frame))) 48 49 def get_events(self): 50 """Remove calls to add_event().""" 51 disallowed = [ident(self.add_event.__func__), ident(ident)] 52 self.frames = None 53 54 return [item for item in self.events if item[2] not in disallowed] 55 56 57class ProfileSimulator(HookWatcher): 58 def __init__(self, testcase): 59 self.testcase = testcase 60 self.stack = [] 61 HookWatcher.__init__(self) 62 63 def callback(self, frame, event, arg): 64 # Callback registered with sys.setprofile()/sys.settrace() 65 self.dispatch[event](self, frame) 66 67 def trace_call(self, frame): 68 self.add_event('call', frame) 69 self.stack.append(frame) 70 71 def trace_return(self, frame): 72 self.add_event('return', frame) 73 self.stack.pop() 74 75 def trace_exception(self, frame): 76 self.testcase.fail( 77 "the profiler should never receive exception events") 78 79 def trace_pass(self, frame): 80 pass 81 82 dispatch = { 83 'call': trace_call, 84 'exception': trace_exception, 85 'return': trace_return, 86 'c_call': trace_pass, 87 'c_return': trace_pass, 88 'c_exception': trace_pass, 89 } 90 91 92class TestCaseBase(unittest.TestCase): 93 def check_events(self, callable, expected): 94 events = capture_events(callable, self.new_watcher()) 95 if events != expected: 96 self.fail("Expected events:\n%s\nReceived events:\n%s" 97 % (pprint.pformat(expected), pprint.pformat(events))) 98 99 100class ProfileHookTestCase(TestCaseBase): 101 def new_watcher(self): 102 return HookWatcher() 103 104 def test_simple(self): 105 def f(p): 106 pass 107 f_ident = ident(f) 108 self.check_events(f, [(1, 'call', f_ident), 109 (1, 'return', f_ident), 110 ]) 111 112 def test_exception(self): 113 def f(p): 114 1/0 115 f_ident = ident(f) 116 self.check_events(f, [(1, 'call', f_ident), 117 (1, 'return', f_ident), 118 ]) 119 120 def test_caught_exception(self): 121 def f(p): 122 try: 1/0 123 except: pass 124 f_ident = ident(f) 125 self.check_events(f, [(1, 'call', f_ident), 126 (1, 'return', f_ident), 127 ]) 128 129 def test_caught_nested_exception(self): 130 def f(p): 131 try: 1/0 132 except: pass 133 f_ident = ident(f) 134 self.check_events(f, [(1, 'call', f_ident), 135 (1, 'return', f_ident), 136 ]) 137 138 def test_nested_exception(self): 139 def f(p): 140 1/0 141 f_ident = ident(f) 142 self.check_events(f, [(1, 'call', f_ident), 143 # This isn't what I expected: 144 # (0, 'exception', protect_ident), 145 # I expected this again: 146 (1, 'return', f_ident), 147 ]) 148 149 def test_exception_in_except_clause(self): 150 def f(p): 151 1/0 152 def g(p): 153 try: 154 f(p) 155 except: 156 try: f(p) 157 except: pass 158 f_ident = ident(f) 159 g_ident = ident(g) 160 self.check_events(g, [(1, 'call', g_ident), 161 (2, 'call', f_ident), 162 (2, 'return', f_ident), 163 (3, 'call', f_ident), 164 (3, 'return', f_ident), 165 (1, 'return', g_ident), 166 ]) 167 168 def test_exception_propagation(self): 169 def f(p): 170 1/0 171 def g(p): 172 try: f(p) 173 finally: p.add_event("falling through") 174 f_ident = ident(f) 175 g_ident = ident(g) 176 self.check_events(g, [(1, 'call', g_ident), 177 (2, 'call', f_ident), 178 (2, 'return', f_ident), 179 (1, 'falling through', g_ident), 180 (1, 'return', g_ident), 181 ]) 182 183 def test_raise_twice(self): 184 def f(p): 185 try: 1/0 186 except: 1/0 187 f_ident = ident(f) 188 self.check_events(f, [(1, 'call', f_ident), 189 (1, 'return', f_ident), 190 ]) 191 192 def test_raise_reraise(self): 193 def f(p): 194 try: 1/0 195 except: raise 196 f_ident = ident(f) 197 self.check_events(f, [(1, 'call', f_ident), 198 (1, 'return', f_ident), 199 ]) 200 201 def test_raise(self): 202 def f(p): 203 raise Exception() 204 f_ident = ident(f) 205 self.check_events(f, [(1, 'call', f_ident), 206 (1, 'return', f_ident), 207 ]) 208 209 def test_distant_exception(self): 210 def f(): 211 1/0 212 def g(): 213 f() 214 def h(): 215 g() 216 def i(): 217 h() 218 def j(p): 219 i() 220 f_ident = ident(f) 221 g_ident = ident(g) 222 h_ident = ident(h) 223 i_ident = ident(i) 224 j_ident = ident(j) 225 self.check_events(j, [(1, 'call', j_ident), 226 (2, 'call', i_ident), 227 (3, 'call', h_ident), 228 (4, 'call', g_ident), 229 (5, 'call', f_ident), 230 (5, 'return', f_ident), 231 (4, 'return', g_ident), 232 (3, 'return', h_ident), 233 (2, 'return', i_ident), 234 (1, 'return', j_ident), 235 ]) 236 237 def test_generator(self): 238 def f(): 239 for i in range(2): 240 yield i 241 def g(p): 242 for i in f(): 243 pass 244 f_ident = ident(f) 245 g_ident = ident(g) 246 self.check_events(g, [(1, 'call', g_ident), 247 # call the iterator twice to generate values 248 (2, 'call', f_ident), 249 (2, 'return', f_ident), 250 (2, 'call', f_ident), 251 (2, 'return', f_ident), 252 # once more; returns end-of-iteration with 253 # actually raising an exception 254 (2, 'call', f_ident), 255 (2, 'return', f_ident), 256 (1, 'return', g_ident), 257 ]) 258 259 def test_stop_iteration(self): 260 def f(): 261 for i in range(2): 262 yield i 263 def g(p): 264 for i in f(): 265 pass 266 f_ident = ident(f) 267 g_ident = ident(g) 268 self.check_events(g, [(1, 'call', g_ident), 269 # call the iterator twice to generate values 270 (2, 'call', f_ident), 271 (2, 'return', f_ident), 272 (2, 'call', f_ident), 273 (2, 'return', f_ident), 274 # once more to hit the raise: 275 (2, 'call', f_ident), 276 (2, 'return', f_ident), 277 (1, 'return', g_ident), 278 ]) 279 280 281class ProfileSimulatorTestCase(TestCaseBase): 282 def new_watcher(self): 283 return ProfileSimulator(self) 284 285 def test_simple(self): 286 def f(p): 287 pass 288 f_ident = ident(f) 289 self.check_events(f, [(1, 'call', f_ident), 290 (1, 'return', f_ident), 291 ]) 292 293 def test_basic_exception(self): 294 def f(p): 295 1/0 296 f_ident = ident(f) 297 self.check_events(f, [(1, 'call', f_ident), 298 (1, 'return', f_ident), 299 ]) 300 301 def test_caught_exception(self): 302 def f(p): 303 try: 1/0 304 except: pass 305 f_ident = ident(f) 306 self.check_events(f, [(1, 'call', f_ident), 307 (1, 'return', f_ident), 308 ]) 309 310 def test_distant_exception(self): 311 def f(): 312 1/0 313 def g(): 314 f() 315 def h(): 316 g() 317 def i(): 318 h() 319 def j(p): 320 i() 321 f_ident = ident(f) 322 g_ident = ident(g) 323 h_ident = ident(h) 324 i_ident = ident(i) 325 j_ident = ident(j) 326 self.check_events(j, [(1, 'call', j_ident), 327 (2, 'call', i_ident), 328 (3, 'call', h_ident), 329 (4, 'call', g_ident), 330 (5, 'call', f_ident), 331 (5, 'return', f_ident), 332 (4, 'return', g_ident), 333 (3, 'return', h_ident), 334 (2, 'return', i_ident), 335 (1, 'return', j_ident), 336 ]) 337 338 # bpo-34125: profiling method_descriptor with **kwargs 339 def test_unbound_method(self): 340 kwargs = {} 341 def f(p): 342 dict.get({}, 42, **kwargs) 343 f_ident = ident(f) 344 self.check_events(f, [(1, 'call', f_ident), 345 (1, 'return', f_ident)]) 346 347 # Test an invalid call (bpo-34126) 348 def test_unbound_method_no_args(self): 349 def f(p): 350 dict.get() 351 f_ident = ident(f) 352 self.check_events(f, [(1, 'call', f_ident), 353 (1, 'return', f_ident)]) 354 355 # Test an invalid call (bpo-34126) 356 def test_unbound_method_invalid_args(self): 357 def f(p): 358 dict.get(print, 42) 359 f_ident = ident(f) 360 self.check_events(f, [(1, 'call', f_ident), 361 (1, 'return', f_ident)]) 362 363 # Test an invalid call (bpo-34125) 364 def test_unbound_method_no_keyword_args(self): 365 kwargs = {} 366 def f(p): 367 dict.get(**kwargs) 368 f_ident = ident(f) 369 self.check_events(f, [(1, 'call', f_ident), 370 (1, 'return', f_ident)]) 371 372 # Test an invalid call (bpo-34125) 373 def test_unbound_method_invalid_keyword_args(self): 374 kwargs = {} 375 def f(p): 376 dict.get(print, 42, **kwargs) 377 f_ident = ident(f) 378 self.check_events(f, [(1, 'call', f_ident), 379 (1, 'return', f_ident)]) 380 381 382def ident(function): 383 if hasattr(function, "f_code"): 384 code = function.f_code 385 else: 386 code = function.__code__ 387 return code.co_firstlineno, code.co_name 388 389 390def protect(f, p): 391 try: f(p) 392 except: pass 393 394protect_ident = ident(protect) 395 396 397def capture_events(callable, p=None): 398 if p is None: 399 p = HookWatcher() 400 # Disable the garbage collector. This prevents __del__s from showing up in 401 # traces. 402 old_gc = gc.isenabled() 403 gc.disable() 404 try: 405 sys.setprofile(p.callback) 406 protect(callable, p) 407 sys.setprofile(None) 408 finally: 409 if old_gc: 410 gc.enable() 411 return p.get_events()[1:-1] 412 413 414def show_events(callable): 415 import pprint 416 pprint.pprint(capture_events(callable)) 417 418 419class TestEdgeCases(unittest.TestCase): 420 421 def setUp(self): 422 self.addCleanup(sys.setprofile, sys.getprofile()) 423 sys.setprofile(None) 424 425 def test_reentrancy(self): 426 def foo(*args): 427 ... 428 429 def bar(*args): 430 ... 431 432 class A: 433 def __call__(self, *args): 434 pass 435 436 def __del__(self): 437 sys.setprofile(bar) 438 439 sys.setprofile(A()) 440 with support.catch_unraisable_exception() as cm: 441 sys.setprofile(foo) 442 self.assertEqual(cm.unraisable.object, A.__del__) 443 self.assertIsInstance(cm.unraisable.exc_value, RuntimeError) 444 445 self.assertEqual(sys.getprofile(), foo) 446 447 448 def test_same_object(self): 449 def foo(*args): 450 ... 451 452 sys.setprofile(foo) 453 del foo 454 sys.setprofile(sys.getprofile()) 455 456 457if __name__ == "__main__": 458 unittest.main() 459