test_capi.py 18.7 KB
Newer Older
1
# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2
# these are all functions _testcapi exports whose name begins with 'test_'.
3

4
import os
5
import pickle
6 7
import random
import subprocess
8
import sys
9
import time
10
import unittest
11
from test import support
12
from test.support import MISSING_C_DOCSTRINGS
13 14 15 16
try:
    import _posixsubprocess
except ImportError:
    _posixsubprocess = None
17 18 19 20
try:
    import threading
except ImportError:
    threading = None
21 22
# Skip this test if the _testcapi module isn't available.
_testcapi = support.import_module('_testcapi')
23

24

25 26 27 28 29 30 31 32 33 34 35 36 37
def testfunction(self):
    """some doc"""
    return self

class InstanceMethod:
    id = _testcapi.instancemethod(id)
    testfunction = _testcapi.instancemethod(testfunction)

class CAPITest(unittest.TestCase):

    def test_instancemethod(self):
        inst = InstanceMethod()
        self.assertEqual(id(inst), inst.id())
38
        self.assertTrue(inst.testfunction() is inst)
39 40 41 42 43 44 45
        self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
        self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)

        InstanceMethod.testfunction.attribute = "test"
        self.assertEqual(testfunction.attribute, "test")
        self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")

46
    @unittest.skipUnless(threading, 'Threading required for this test.')
47
    def test_no_FatalError_infinite_loop(self):
48
        with support.SuppressCrashReport():
49
            p = subprocess.Popen([sys.executable, "-c",
Ezio Melotti's avatar
Ezio Melotti committed
50 51 52 53
                                  'import _testcapi;'
                                  '_testcapi.crash_no_current_thread()'],
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.PIPE)
54 55 56
        (out, err) = p.communicate()
        self.assertEqual(out, b'')
        # This used to cause an infinite loop.
57
        self.assertTrue(err.rstrip().startswith(
58
                         b'Fatal Python error:'
59
                         b' PyThreadState_Get: no current thread'))
60

61 62
    def test_memoryview_from_NULL_pointer(self):
        self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
63

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
    def test_exc_info(self):
        raised_exception = ValueError("5")
        new_exc = TypeError("TEST")
        try:
            raise raised_exception
        except ValueError as e:
            tb = e.__traceback__
            orig_sys_exc_info = sys.exc_info()
            orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
            new_sys_exc_info = sys.exc_info()
            new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
            reset_sys_exc_info = sys.exc_info()

            self.assertEqual(orig_exc_info[1], e)

            self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
            self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
            self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
            self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
            self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
        else:
            self.assertTrue(False)

87 88 89 90 91 92 93 94
    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
    def test_seq_bytes_to_charp_array(self):
        # Issue #15732: crash in _PySequence_BytesToCharpArray()
        class Z(object):
            def __len__(self):
                return 1
        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
                          1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
95 96 97 98 99 100 101 102
        # Issue #15736: overflow in _PySequence_BytesToCharpArray()
        class Z(object):
            def __len__(self):
                return sys.maxsize
            def __getitem__(self, i):
                return b'x'
        self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
                          1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
103

104 105 106 107 108 109 110 111 112
    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
    def test_subprocess_fork_exec(self):
        class Z(object):
            def __len__(self):
                return 1

        # Issue #15738: crash in subprocess_fork_exec()
        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
                          Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
113

114 115
    @unittest.skipIf(MISSING_C_DOCSTRINGS,
                     "Signature information for builtins requires docstrings")
116 117 118 119 120 121 122 123 124 125 126 127 128
    def test_docstring_signature_parsing(self):

        self.assertEqual(_testcapi.no_docstring.__doc__, None)
        self.assertEqual(_testcapi.no_docstring.__text_signature__, None)

        self.assertEqual(_testcapi.docstring_empty.__doc__, "")
        self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)

        self.assertEqual(_testcapi.docstring_no_signature.__doc__,
            "This docstring has no signature.")
        self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)

        self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
129
            "docstring_with_invalid_signature($module, /, boo)\n"
130 131 132 133 134
            "\n"
            "This docstring has an invalid signature."
            )
        self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)

135 136 137 138 139 140 141 142 143
        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
            "docstring_with_invalid_signature2($module, /, boo)\n"
            "\n"
            "--\n"
            "\n"
            "This docstring also has an invalid signature."
            )
        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)

144 145
        self.assertEqual(_testcapi.docstring_with_signature.__doc__,
            "This docstring has a valid signature.")
146
        self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
147 148

        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
149
            "\nThis docstring has a valid signature and some extra newlines.")
150
        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
151
            "($module, /, parameter)")
152

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
    def test_c_type_with_matrix_multiplication(self):
        M = _testcapi.matmulType
        m1 = M()
        m2 = M()
        self.assertEqual(m1 @ m2, ("matmul", m1, m2))
        self.assertEqual(m1 @ 42, ("matmul", m1, 42))
        self.assertEqual(42 @ m1, ("matmul", 42, m1))
        o = m1
        o @= m2
        self.assertEqual(o, ("imatmul", m1, m2))
        o = m1
        o @= 42
        self.assertEqual(o, ("imatmul", m1, 42))
        o = 42
        o @= m1
        self.assertEqual(o, ("matmul", 42, m1))

170

171
@unittest.skipUnless(threading, 'Threading required for this test.')
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
class TestPendingCalls(unittest.TestCase):

    def pendingcalls_submit(self, l, n):
        def callback():
            #this function can be interrupted by thread switching so let's
            #use an atomic operation
            l.append(None)

        for i in range(n):
            time.sleep(random.random()*0.02) #0.01 secs on average
            #try submitting callback until successful.
            #rely on regular interrupt to flush queue if we are
            #unsuccessful.
            while True:
                if _testcapi._pending_threadfunc(callback):
                    break;

189
    def pendingcalls_wait(self, l, n, context = None):
190 191 192 193 194 195
        #now, stick around until l[0] has grown to 10
        count = 0;
        while len(l) != n:
            #this busy loop is where we expect to be interrupted to
            #run our callbacks.  Note that callbacks are only run on the
            #main thread
196
            if False and support.verbose:
197 198 199
                print("(%i)"%(len(l),),)
            for i in range(1000):
                a = i*i
200 201
            if context and not context.event.is_set():
                continue
202
            count += 1
203
            self.assertTrue(count < 10000,
204
                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
205
        if False and support.verbose:
206 207 208 209 210
            print("(%i)"%(len(l),))

    def test_pendingcalls_threaded(self):

        #do every callback on a separate thread
211
        n = 32 #total callbacks
212
        threads = []
213 214 215 216 217 218 219 220 221 222 223
        class foo(object):pass
        context = foo()
        context.l = []
        context.n = 2 #submits per thread
        context.nThreads = n // context.n
        context.nFinished = 0
        context.lock = threading.Lock()
        context.event = threading.Event()

        for i in range(context.nThreads):
            t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
224 225 226
            t.start()
            threads.append(t)

227
        self.pendingcalls_wait(context.l, n, context)
228 229 230 231

        for t in threads:
            t.join()

232 233 234 235 236 237 238 239 240 241 242 243
    def pendingcalls_thread(self, context):
        try:
            self.pendingcalls_submit(context.l, context.n)
        finally:
            with context.lock:
                context.nFinished += 1
                nFinished = context.nFinished
                if False and support.verbose:
                    print("finished threads: ", nFinished)
            if nFinished == context.nThreads:
                context.event.set()

244
    def test_pendingcalls_non_threaded(self):
245
        #again, just using the main thread, likely they will all be dispatched at
246 247 248 249 250 251 252 253
        #once.  It is ok to ask for too many, because we loop until we find a slot.
        #the loop can be interrupted to dispatch.
        #there are only 32 dispatch slots, so we go for twice that!
        l = []
        n = 64
        self.pendingcalls_submit(l, n)
        self.pendingcalls_wait(l, n)

254 255 256

class SubinterpreterTest(unittest.TestCase):

257 258 259 260 261 262 263 264 265 266
    def test_subinterps(self):
        import builtins
        r, w = os.pipe()
        code = """if 1:
            import sys, builtins, pickle
            with open({:d}, "wb") as f:
                pickle.dump(id(sys.modules), f)
                pickle.dump(id(builtins), f)
            """.format(w)
        with open(r, "rb") as f:
267
            ret = support.run_in_subinterp(code)
268 269 270 271
            self.assertEqual(ret, 0)
            self.assertNotEqual(pickle.load(f), id(sys.modules))
            self.assertNotEqual(pickle.load(f), id(builtins))

272

273 274 275 276
# Bug #6012
class Test6012(unittest.TestCase):
    def test(self):
        self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
277

278

279 280
class EmbeddingTests(unittest.TestCase):
    def setUp(self):
281
        basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
282 283 284 285 286 287
        exename = "_testembed"
        if sys.platform.startswith("win"):
            ext = ("_d" if "_d" in sys.executable else "") + ".exe"
            exename += ext
            exepath = os.path.dirname(sys.executable)
        else:
288
            exepath = os.path.join(basepath, "Programs")
289
        self.test_exe = exe = os.path.join(exepath, exename)
290 291
        if not os.path.exists(exe):
            self.skipTest("%r doesn't exist" % exe)
292 293 294
        # This is needed otherwise we get a fatal error:
        # "Py_Initialize: Unable to get the locale encoding
        # LookupError: no codec search functions registered: can't find encoding"
295
        self.oldcwd = os.getcwd()
296
        os.chdir(basepath)
297 298 299 300 301 302 303 304 305 306

    def tearDown(self):
        os.chdir(self.oldcwd)

    def run_embedded_interpreter(self, *args):
        """Runs a test in the embedded interpreter"""
        cmd = [self.test_exe]
        cmd.extend(args)
        p = subprocess.Popen(cmd,
                             stdout=subprocess.PIPE,
307 308
                             stderr=subprocess.PIPE,
                             universal_newlines=True)
309 310 311 312
        (out, err) = p.communicate()
        self.assertEqual(p.returncode, 0,
                         "bad returncode %d, stderr is %r" %
                         (p.returncode, err))
313
        return out, err
314 315 316 317 318 319 320 321 322

    def test_subinterps(self):
        # This is just a "don't crash" test
        out, err = self.run_embedded_interpreter()
        if support.verbose:
            print()
            print(out)
            print(err)

323 324 325 326 327 328 329 330 331 332
    @staticmethod
    def _get_default_pipe_encoding():
        rp, wp = os.pipe()
        try:
            with os.fdopen(wp, 'w') as w:
                default_pipe_encoding = w.encoding
        finally:
            os.close(rp)
        return default_pipe_encoding

333 334 335 336 337 338 339
    def test_forced_io_encoding(self):
        # Checks forced configuration of embedded interpreter IO streams
        out, err = self.run_embedded_interpreter("forced_io_encoding")
        if support.verbose:
            print()
            print(out)
            print(err)
340
        expected_errors = sys.__stdout__.errors
341 342
        expected_stdin_encoding = sys.__stdin__.encoding
        expected_pipe_encoding = self._get_default_pipe_encoding()
343
        expected_output = '\n'.join([
344 345 346
        "--- Use defaults ---",
        "Expected encoding: default",
        "Expected errors: default",
347 348 349
        "stdin: {in_encoding}:{errors}",
        "stdout: {out_encoding}:{errors}",
        "stderr: {out_encoding}:backslashreplace",
350 351
        "--- Set errors only ---",
        "Expected encoding: default",
352 353 354 355
        "Expected errors: ignore",
        "stdin: {in_encoding}:ignore",
        "stdout: {out_encoding}:ignore",
        "stderr: {out_encoding}:backslashreplace",
356 357 358
        "--- Set encoding only ---",
        "Expected encoding: latin-1",
        "Expected errors: default",
359 360
        "stdin: latin-1:{errors}",
        "stdout: latin-1:{errors}",
361 362 363
        "stderr: latin-1:backslashreplace",
        "--- Set encoding and errors ---",
        "Expected encoding: latin-1",
364 365 366 367 368 369 370 371
        "Expected errors: replace",
        "stdin: latin-1:replace",
        "stdout: latin-1:replace",
        "stderr: latin-1:backslashreplace"])
        expected_output = expected_output.format(
                                in_encoding=expected_stdin_encoding,
                                out_encoding=expected_pipe_encoding,
                                errors=expected_errors)
372
        # This is useful if we ever trip over odd platform behaviour
373
        self.maxDiff = None
374
        self.assertEqual(out.strip(), expected_output)
375

376 377 378 379 380 381 382 383
class SkipitemTest(unittest.TestCase):

    def test_skipitem(self):
        """
        If this test failed, you probably added a new "format unit"
        in Python/getargs.c, but neglected to update our poor friend
        skipitem() in the same file.  (If so, shame on you!)

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
        With a few exceptions**, this function brute-force tests all
        printable ASCII*** characters (32 to 126 inclusive) as format units,
        checking to see that PyArg_ParseTupleAndKeywords() return consistent
        errors both when the unit is attempted to be used and when it is
        skipped.  If the format unit doesn't exist, we'll get one of two
        specific error messages (one for used, one for skipped); if it does
        exist we *won't* get that error--we'll get either no error or some
        other error.  If we get the specific "does not exist" error for one
        test and not for the other, there's a mismatch, and the test fails.

           ** Some format units have special funny semantics and it would
              be difficult to accomodate them here.  Since these are all
              well-established and properly skipped in skipitem() we can
              get away with not testing them--this test is really intended
              to catch *new* format units.

          *** Python C source files must be ASCII.  Therefore it's impossible
              to have non-ASCII format units.

403 404 405 406 407 408
        """
        empty_tuple = ()
        tuple_1 = (0,)
        dict_b = {'b':1}
        keywords = ["a", "b"]

409
        for i in range(32, 127):
410 411 412 413 414
            c = chr(i)

            # skip parentheses, the error reporting is inconsistent about them
            # skip 'e', it's always a two-character code
            # skip '|' and '$', they don't represent arguments anyway
415
            if c in '()e|$':
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
                continue

            # test the format unit when not skipped
            format = c + "i"
            try:
                # (note: the format string must be bytes!)
                _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
                    format.encode("ascii"), keywords)
                when_not_skipped = False
            except TypeError as e:
                s = "argument 1 must be impossible<bad format char>, not int"
                when_not_skipped = (str(e) == s)
            except RuntimeError as e:
                when_not_skipped = False

            # test the format unit when skipped
            optional_format = "|" + format
            try:
                _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
                    optional_format.encode("ascii"), keywords)
                when_skipped = False
            except RuntimeError as e:
                s = "impossible<bad format char>: '{}'".format(format)
                when_skipped = (str(e) == s)

            message = ("test_skipitem_parity: "
                "detected mismatch between convertsimple and skipitem "
                "for format unit '{}' ({}), not skipped {}, skipped {}".format(
                    c, i, when_skipped, when_not_skipped))
            self.assertIs(when_skipped, when_not_skipped, message)
446

447 448 449 450 451 452 453 454 455 456 457
    def test_parse_tuple_and_keywords(self):
        # parse_tuple_and_keywords error handling tests
        self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
                          (), {}, 42, [])
        self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
                          (), {}, b'', 42)
        self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
                          (), {}, b'', [''] * 42)
        self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
                          (), {}, b'', [42])

458 459 460 461 462 463 464 465 466 467
@unittest.skipUnless(threading, 'Threading required for this test.')
class TestThreadState(unittest.TestCase):

    @support.reap_threads
    def test_thread_state(self):
        # some extra thread-state tests driven via _testcapi
        def target():
            idents = []

            def callback():
Ezio Melotti's avatar
Ezio Melotti committed
468
                idents.append(threading.get_ident())
469 470 471 472 473

            _testcapi._test_thread_state(callback)
            a = b = callback
            time.sleep(1)
            # Check our main thread is in the list exactly 3 times.
Ezio Melotti's avatar
Ezio Melotti committed
474
            self.assertEqual(idents.count(threading.get_ident()), 3,
475 476 477 478 479 480 481
                             "Couldn't find main thread correctly in the list")

        target()
        t = threading.Thread(target=target)
        t.start()
        t.join()

482 483 484 485
class Test_testcapi(unittest.TestCase):
    def test__testcapi(self):
        for name in dir(_testcapi):
            if name.startswith('test_'):
486 487 488
                with self.subTest("internal", name=name):
                    test = getattr(_testcapi, name)
                    test()
489 490

if __name__ == "__main__":
491
    unittest.main()