test_sys_setprofile.py 12 KB
Newer Older
1
import gc
2 3 4 5 6
import pprint
import sys
import unittest


7 8 9 10 11 12 13 14
class TestGetProfile(unittest.TestCase):
    def setUp(self):
        sys.setprofile(None)

    def tearDown(self):
        sys.setprofile(None)

    def test_empty(self):
15
        self.assertIsNone(sys.getprofile())
16 17 18 19 20 21

    def test_setget(self):
        def fn(*args):
            pass

        sys.setprofile(fn)
22
        self.assertIs(sys.getprofile(), fn)
23 24 25 26 27 28 29

class HookWatcher:
    def __init__(self):
        self.frames = []
        self.events = []

    def callback(self, frame, event, arg):
30 31 32 33
        if (event == "call"
            or event == "return"
            or event == "exception"):
            self.add_event(event, frame)
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49

    def add_event(self, event, frame=None):
        """Add an event to the log."""
        if frame is None:
            frame = sys._getframe(1)

        try:
            frameno = self.frames.index(frame)
        except ValueError:
            frameno = len(self.frames)
            self.frames.append(frame)

        self.events.append((frameno, event, ident(frame)))

    def get_events(self):
        """Remove calls to add_event()."""
50
        disallowed = [ident(self.add_event.__func__), ident(ident)]
51
        self.frames = None
52

53
        return [item for item in self.events if item[2] not in disallowed]
54 55


56
class ProfileSimulator(HookWatcher):
57 58
    def __init__(self, testcase):
        self.testcase = testcase
59 60 61 62
        self.stack = []
        HookWatcher.__init__(self)

    def callback(self, frame, event, arg):
63
        # Callback registered with sys.setprofile()/sys.settrace()
64 65 66 67 68 69 70 71 72 73 74
        self.dispatch[event](self, frame)

    def trace_call(self, frame):
        self.add_event('call', frame)
        self.stack.append(frame)

    def trace_return(self, frame):
        self.add_event('return', frame)
        self.stack.pop()

    def trace_exception(self, frame):
75 76
        self.testcase.fail(
            "the profiler should never receive exception events")
77

78 79 80
    def trace_pass(self, frame):
        pass

81 82 83 84
    dispatch = {
        'call': trace_call,
        'exception': trace_exception,
        'return': trace_return,
85 86 87
        'c_call': trace_pass,
        'c_return': trace_pass,
        'c_exception': trace_pass,
88
        }
89

90 91

class TestCaseBase(unittest.TestCase):
92
    def check_events(self, callable, expected):
93
        events = capture_events(callable, self.new_watcher())
94 95 96 97
        if events != expected:
            self.fail("Expected events:\n%s\nReceived events:\n%s"
                      % (pprint.pformat(expected), pprint.pformat(events)))

98 99 100 101 102

class ProfileHookTestCase(TestCaseBase):
    def new_watcher(self):
        return HookWatcher()

103 104 105 106
    def test_simple(self):
        def f(p):
            pass
        f_ident = ident(f)
107 108
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
109 110 111
                              ])

    def test_exception(self):
112 113 114 115
        def f(p):
            1/0
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
116
                              (1, 'return', f_ident),
117 118 119
                              ])

    def test_caught_exception(self):
120
        def f(p):
121 122
            try: 1/0
            except: pass
123
        f_ident = ident(f)
124 125
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
126 127
                              ])

128 129 130 131 132
    def test_caught_nested_exception(self):
        def f(p):
            try: 1/0
            except: pass
        f_ident = ident(f)
133
        self.check_events(f, [(1, 'call', f_ident),
134 135 136
                              (1, 'return', f_ident),
                              ])

137
    def test_nested_exception(self):
138 139 140
        def f(p):
            1/0
        f_ident = ident(f)
141
        self.check_events(f, [(1, 'call', f_ident),
142
                              # This isn't what I expected:
143
                              # (0, 'exception', protect_ident),
144
                              # I expected this again:
145
                              (1, 'return', f_ident),
146 147 148
                              ])

    def test_exception_in_except_clause(self):
149 150 151 152 153 154
        def f(p):
            1/0
        def g(p):
            try:
                f(p)
            except:
155 156
                try: f(p)
                except: pass
157 158
        f_ident = ident(f)
        g_ident = ident(g)
159
        self.check_events(g, [(1, 'call', g_ident),
160
                              (2, 'call', f_ident),
161
                              (2, 'return', f_ident),
162
                              (3, 'call', f_ident),
163
                              (3, 'return', f_ident),
164
                              (1, 'return', g_ident),
165 166
                              ])

167
    def test_exception_propagation(self):
168 169 170 171 172 173 174
        def f(p):
            1/0
        def g(p):
            try: f(p)
            finally: p.add_event("falling through")
        f_ident = ident(f)
        g_ident = ident(g)
175
        self.check_events(g, [(1, 'call', g_ident),
176
                              (2, 'call', f_ident),
177
                              (2, 'return', f_ident),
178
                              (1, 'falling through', g_ident),
179
                              (1, 'return', g_ident),
180
                              ])
181

182 183 184 185 186 187
    def test_raise_twice(self):
        def f(p):
            try: 1/0
            except: 1/0
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
188
                              (1, 'return', f_ident),
189 190 191 192 193 194 195 196
                              ])

    def test_raise_reraise(self):
        def f(p):
            try: 1/0
            except: raise
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
197
                              (1, 'return', f_ident),
198 199 200 201 202 203 204
                              ])

    def test_raise(self):
        def f(p):
            raise Exception()
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
205
                              (1, 'return', f_ident),
206 207
                              ])

208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
    def test_distant_exception(self):
        def f():
            1/0
        def g():
            f()
        def h():
            g()
        def i():
            h()
        def j(p):
            i()
        f_ident = ident(f)
        g_ident = ident(g)
        h_ident = ident(h)
        i_ident = ident(i)
        j_ident = ident(j)
        self.check_events(j, [(1, 'call', j_ident),
                              (2, 'call', i_ident),
                              (3, 'call', h_ident),
                              (4, 'call', g_ident),
                              (5, 'call', f_ident),
229 230 231 232 233
                              (5, 'return', f_ident),
                              (4, 'return', g_ident),
                              (3, 'return', h_ident),
                              (2, 'return', i_ident),
                              (1, 'return', j_ident),
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
                              ])

    def test_generator(self):
        def f():
            for i in range(2):
                yield i
        def g(p):
            for i in f():
                pass
        f_ident = ident(f)
        g_ident = ident(g)
        self.check_events(g, [(1, 'call', g_ident),
                              # call the iterator twice to generate values
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              # once more; returns end-of-iteration with
                              # actually raising an exception
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              (1, 'return', g_ident),
                              ])

    def test_stop_iteration(self):
        def f():
            for i in range(2):
                yield i
        def g(p):
            for i in f():
                pass
        f_ident = ident(f)
        g_ident = ident(g)
        self.check_events(g, [(1, 'call', g_ident),
                              # call the iterator twice to generate values
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              # once more to hit the raise:
                              (2, 'call', f_ident),
275
                              (2, 'return', f_ident),
276 277 278
                              (1, 'return', g_ident),
                              ])

279

280 281
class ProfileSimulatorTestCase(TestCaseBase):
    def new_watcher(self):
282
        return ProfileSimulator(self)
283 284 285 286 287 288 289 290 291 292 293 294 295 296

    def test_simple(self):
        def f(p):
            pass
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
                              ])

    def test_basic_exception(self):
        def f(p):
            1/0
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
297
                              (1, 'return', f_ident),
298 299
                              ])

300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    def test_caught_exception(self):
        def f(p):
            try: 1/0
            except: pass
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
                              ])

    def test_distant_exception(self):
        def f():
            1/0
        def g():
            f()
        def h():
            g()
        def i():
            h()
        def j(p):
            i()
        f_ident = ident(f)
        g_ident = ident(g)
        h_ident = ident(h)
        i_ident = ident(i)
        j_ident = ident(j)
        self.check_events(j, [(1, 'call', j_ident),
                              (2, 'call', i_ident),
                              (3, 'call', h_ident),
                              (4, 'call', g_ident),
                              (5, 'call', f_ident),
330 331 332 333 334
                              (5, 'return', f_ident),
                              (4, 'return', g_ident),
                              (3, 'return', h_ident),
                              (2, 'return', i_ident),
                              (1, 'return', j_ident),
335 336
                              ])

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
    # Test an invalid call (bpo-34126)
    def test_unbound_method_no_args(self):
        def f(p):
            dict.get()
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident)])

    # Test an invalid call (bpo-34126)
    def test_unbound_method_invalid_args(self):
        def f(p):
            dict.get(print, 42)
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident)])

353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
    # Test an invalid call (bpo-34125)
    def test_unbound_method_no_args(self):
        kwargs = {}
        def f(p):
            dict.get(**kwargs)
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident)])

    # Test an invalid call (bpo-34125)
    def test_unbound_method_invalid_args(self):
        kwargs = {}
        def f(p):
            dict.get(print, 42, **kwargs)
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident)])

371

372 373 374 375
def ident(function):
    if hasattr(function, "f_code"):
        code = function.f_code
    else:
376
        code = function.__code__
377 378 379
    return code.co_firstlineno, code.co_name


380 381 382 383 384 385 386
def protect(f, p):
    try: f(p)
    except: pass

protect_ident = ident(protect)


387 388 389
def capture_events(callable, p=None):
    if p is None:
        p = HookWatcher()
390 391 392 393 394 395 396 397 398 399 400
    # Disable the garbage collector. This prevents __del__s from showing up in
    # traces.
    old_gc = gc.isenabled()
    gc.disable()
    try:
        sys.setprofile(p.callback)
        protect(callable, p)
        sys.setprofile(None)
    finally:
        if old_gc:
            gc.enable()
401
    return p.get_events()[1:-1]
402 403 404 405 406 407 408 409


def show_events(callable):
    import pprint
    pprint.pprint(capture_events(callable))


if __name__ == "__main__":
410
    unittest.main()