test_sys_setprofile.py 11.1 KB
Newer Older
1
import gc
2 3 4 5
import pprint
import sys
import unittest

6
from test import support
7

8 9 10 11 12 13 14 15
class TestGetProfile(unittest.TestCase):
    def setUp(self):
        sys.setprofile(None)

    def tearDown(self):
        sys.setprofile(None)

    def test_empty(self):
16
        self.assertIsNone(sys.getprofile())
17 18 19 20 21 22

    def test_setget(self):
        def fn(*args):
            pass

        sys.setprofile(fn)
23
        self.assertIs(sys.getprofile(), fn)
24 25 26 27 28 29 30

class HookWatcher:
    def __init__(self):
        self.frames = []
        self.events = []

    def callback(self, frame, event, arg):
31 32 33 34
        if (event == "call"
            or event == "return"
            or event == "exception"):
            self.add_event(event, frame)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50

    def add_event(self, event, frame=None):
        """Add an event to the log."""
        if frame is None:
            frame = sys._getframe(1)

        try:
            frameno = self.frames.index(frame)
        except ValueError:
            frameno = len(self.frames)
            self.frames.append(frame)

        self.events.append((frameno, event, ident(frame)))

    def get_events(self):
        """Remove calls to add_event()."""
51
        disallowed = [ident(self.add_event.__func__), ident(ident)]
52
        self.frames = None
53

54
        return [item for item in self.events if item[2] not in disallowed]
55 56


57
class ProfileSimulator(HookWatcher):
58 59
    def __init__(self, testcase):
        self.testcase = testcase
60 61 62 63
        self.stack = []
        HookWatcher.__init__(self)

    def callback(self, frame, event, arg):
64
        # Callback registered with sys.setprofile()/sys.settrace()
65 66 67 68 69 70 71 72 73 74 75
        self.dispatch[event](self, frame)

    def trace_call(self, frame):
        self.add_event('call', frame)
        self.stack.append(frame)

    def trace_return(self, frame):
        self.add_event('return', frame)
        self.stack.pop()

    def trace_exception(self, frame):
76 77
        self.testcase.fail(
            "the profiler should never receive exception events")
78

79 80 81
    def trace_pass(self, frame):
        pass

82 83 84 85
    dispatch = {
        'call': trace_call,
        'exception': trace_exception,
        'return': trace_return,
86 87 88
        'c_call': trace_pass,
        'c_return': trace_pass,
        'c_exception': trace_pass,
89
        }
90

91 92

class TestCaseBase(unittest.TestCase):
93
    def check_events(self, callable, expected):
94
        events = capture_events(callable, self.new_watcher())
95 96 97 98
        if events != expected:
            self.fail("Expected events:\n%s\nReceived events:\n%s"
                      % (pprint.pformat(expected), pprint.pformat(events)))

99 100 101 102 103

class ProfileHookTestCase(TestCaseBase):
    def new_watcher(self):
        return HookWatcher()

104 105 106 107
    def test_simple(self):
        def f(p):
            pass
        f_ident = ident(f)
108 109
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
110 111 112
                              ])

    def test_exception(self):
113 114 115 116
        def f(p):
            1/0
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
117
                              (1, 'return', f_ident),
118 119 120
                              ])

    def test_caught_exception(self):
121
        def f(p):
122 123
            try: 1/0
            except: pass
124
        f_ident = ident(f)
125 126
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
127 128
                              ])

129 130 131 132 133
    def test_caught_nested_exception(self):
        def f(p):
            try: 1/0
            except: pass
        f_ident = ident(f)
134
        self.check_events(f, [(1, 'call', f_ident),
135 136 137
                              (1, 'return', f_ident),
                              ])

138
    def test_nested_exception(self):
139 140 141
        def f(p):
            1/0
        f_ident = ident(f)
142
        self.check_events(f, [(1, 'call', f_ident),
143
                              # This isn't what I expected:
144
                              # (0, 'exception', protect_ident),
145
                              # I expected this again:
146
                              (1, 'return', f_ident),
147 148 149
                              ])

    def test_exception_in_except_clause(self):
150 151 152 153 154 155
        def f(p):
            1/0
        def g(p):
            try:
                f(p)
            except:
156 157
                try: f(p)
                except: pass
158 159
        f_ident = ident(f)
        g_ident = ident(g)
160
        self.check_events(g, [(1, 'call', g_ident),
161
                              (2, 'call', f_ident),
162
                              (2, 'return', f_ident),
163
                              (3, 'call', f_ident),
164
                              (3, 'return', f_ident),
165
                              (1, 'return', g_ident),
166 167
                              ])

168 169 170 171 172 173 174 175
    def test_exception_propogation(self):
        def f(p):
            1/0
        def g(p):
            try: f(p)
            finally: p.add_event("falling through")
        f_ident = ident(f)
        g_ident = ident(g)
176
        self.check_events(g, [(1, 'call', g_ident),
177
                              (2, 'call', f_ident),
178
                              (2, 'return', f_ident),
179
                              (1, 'falling through', g_ident),
180
                              (1, 'return', g_ident),
181
                              ])
182

183 184 185 186 187 188
    def test_raise_twice(self):
        def f(p):
            try: 1/0
            except: 1/0
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
189
                              (1, 'return', f_ident),
190 191 192 193 194 195 196 197
                              ])

    def test_raise_reraise(self):
        def f(p):
            try: 1/0
            except: raise
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
198
                              (1, 'return', f_ident),
199 200 201 202 203 204 205
                              ])

    def test_raise(self):
        def f(p):
            raise Exception()
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
206
                              (1, 'return', f_ident),
207 208
                              ])

209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
    def test_distant_exception(self):
        def f():
            1/0
        def g():
            f()
        def h():
            g()
        def i():
            h()
        def j(p):
            i()
        f_ident = ident(f)
        g_ident = ident(g)
        h_ident = ident(h)
        i_ident = ident(i)
        j_ident = ident(j)
        self.check_events(j, [(1, 'call', j_ident),
                              (2, 'call', i_ident),
                              (3, 'call', h_ident),
                              (4, 'call', g_ident),
                              (5, 'call', f_ident),
230 231 232 233 234
                              (5, 'return', f_ident),
                              (4, 'return', g_ident),
                              (3, 'return', h_ident),
                              (2, 'return', i_ident),
                              (1, 'return', j_ident),
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
                              ])

    def test_generator(self):
        def f():
            for i in range(2):
                yield i
        def g(p):
            for i in f():
                pass
        f_ident = ident(f)
        g_ident = ident(g)
        self.check_events(g, [(1, 'call', g_ident),
                              # call the iterator twice to generate values
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              # once more; returns end-of-iteration with
                              # actually raising an exception
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              (1, 'return', g_ident),
                              ])

    def test_stop_iteration(self):
        def f():
            for i in range(2):
                yield i
            raise StopIteration
        def g(p):
            for i in f():
                pass
        f_ident = ident(f)
        g_ident = ident(g)
        self.check_events(g, [(1, 'call', g_ident),
                              # call the iterator twice to generate values
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              (2, 'call', f_ident),
                              (2, 'return', f_ident),
                              # once more to hit the raise:
                              (2, 'call', f_ident),
277
                              (2, 'return', f_ident),
278 279 280
                              (1, 'return', g_ident),
                              ])

281

282 283
class ProfileSimulatorTestCase(TestCaseBase):
    def new_watcher(self):
284
        return ProfileSimulator(self)
285 286 287 288 289 290 291 292 293 294 295 296 297 298

    def test_simple(self):
        def f(p):
            pass
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
                              ])

    def test_basic_exception(self):
        def f(p):
            1/0
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
299
                              (1, 'return', f_ident),
300 301
                              ])

302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
    def test_caught_exception(self):
        def f(p):
            try: 1/0
            except: pass
        f_ident = ident(f)
        self.check_events(f, [(1, 'call', f_ident),
                              (1, 'return', f_ident),
                              ])

    def test_distant_exception(self):
        def f():
            1/0
        def g():
            f()
        def h():
            g()
        def i():
            h()
        def j(p):
            i()
        f_ident = ident(f)
        g_ident = ident(g)
        h_ident = ident(h)
        i_ident = ident(i)
        j_ident = ident(j)
        self.check_events(j, [(1, 'call', j_ident),
                              (2, 'call', i_ident),
                              (3, 'call', h_ident),
                              (4, 'call', g_ident),
                              (5, 'call', f_ident),
332 333 334 335 336
                              (5, 'return', f_ident),
                              (4, 'return', g_ident),
                              (3, 'return', h_ident),
                              (2, 'return', i_ident),
                              (1, 'return', j_ident),
337 338
                              ])

339

340 341 342 343
def ident(function):
    if hasattr(function, "f_code"):
        code = function.f_code
    else:
344
        code = function.__code__
345 346 347
    return code.co_firstlineno, code.co_name


348 349 350 351 352 353 354
def protect(f, p):
    try: f(p)
    except: pass

protect_ident = ident(protect)


355 356 357
def capture_events(callable, p=None):
    if p is None:
        p = HookWatcher()
358 359 360 361 362 363 364 365 366 367 368
    # Disable the garbage collector. This prevents __del__s from showing up in
    # traces.
    old_gc = gc.isenabled()
    gc.disable()
    try:
        sys.setprofile(p.callback)
        protect(callable, p)
        sys.setprofile(None)
    finally:
        if old_gc:
            gc.enable()
369
    return p.get_events()[1:-1]
370 371 372 373 374 375 376 377


def show_events(callable):
    import pprint
    pprint.pprint(capture_events(callable))


def test_main():
378
    support.run_unittest(
379
        TestGetProfile,
380 381 382
        ProfileHookTestCase,
        ProfileSimulatorTestCase
    )
383 384 385 386


if __name__ == "__main__":
    test_main()