test_threading_local.py 6.13 KB
Newer Older
1 2
import unittest
from doctest import DocTestSuite
3
from test import support
4 5 6
import weakref
import gc

7 8 9 10 11 12
# Modules under test
_thread = support.import_module('_thread')
threading = support.import_module('threading')
import _threading_local


13 14 15 16 17 18 19 20
class Weak(object):
    pass

def target(local, weaklist):
    weak = Weak()
    local.weak = weak
    weaklist.append(weakref.ref(weak))

21 22

class BaseLocalTest:
23 24 25 26 27 28 29

    def test_local_refs(self):
        self._local_refs(20)
        self._local_refs(50)
        self._local_refs(100)

    def _local_refs(self, n):
30
        local = self._local()
31 32 33 34 35 36 37 38 39 40
        weaklist = []
        for i in range(n):
            t = threading.Thread(target=target, args=(local, weaklist))
            t.start()
            t.join()
        del t

        gc.collect()
        self.assertEqual(len(weaklist), n)

41
        # XXX _threading_local keeps the local of the last stopped thread alive.
42
        deadlist = [weak for weak in weaklist if weak() is None]
43
        self.assertIn(len(deadlist), (n-1, n))
44 45 46 47 48

        # Assignment to the same thread local frees it sometimes (!)
        local.someothervar = None
        gc.collect()
        deadlist = [weak for weak in weaklist if weak() is None]
49
        self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist)))
50

51 52 53 54 55 56
    def test_derived(self):
        # Issue 3088: if there is a threads switch inside the __init__
        # of a threading.local derived class, the per-thread dictionary
        # is created but not correctly set on the object.
        # The first member set may be bogus.
        import time
57
        class Local(self._local):
58 59 60 61 62 63 64 65 66
            def __init__(self):
                time.sleep(0.01)
        local = Local()

        def f(i):
            local.x = i
            # Simply check that the variable is correctly set
            self.assertEqual(local.x, i)

67 68 69
        with support.start_threads(threading.Thread(target=f, args=(i,))
                                   for i in range(10)):
            pass
70

71 72
    def test_derived_cycle_dealloc(self):
        # http://bugs.python.org/issue6990
73
        class Local(self._local):
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
            pass
        locals = None
        passed = False
        e1 = threading.Event()
        e2 = threading.Event()

        def f():
            nonlocal passed
            # 1) Involve Local in a cycle
            cycle = [Local()]
            cycle.append(cycle)
            cycle[0].foo = 'bar'

            # 2) GC the cycle (triggers threadmodule.c::local_clear
            # before local_dealloc)
            del cycle
            gc.collect()
            e1.set()
            e2.wait()

            # 4) New Locals should be empty
            passed = all(not hasattr(local, 'foo') for local in locals)

        t = threading.Thread(target=f)
        t.start()
        e1.wait()

        # 3) New Locals should recycle the original's address. Creating
        # them in the thread overwrites the thread state and avoids the
        # bug
        locals = [Local() for i in range(10)]
        e2.set()
        t.join()

        self.assertTrue(passed)

110 111
    def test_arguments(self):
        # Issue 1522237
112 113 114 115 116 117 118 119 120
        class MyLocal(self._local):
            def __init__(self, *args, **kwargs):
                pass

        MyLocal(a=1)
        MyLocal(1)
        self.assertRaises(TypeError, self._local, a=1)
        self.assertRaises(TypeError, self._local, 1)

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
    def _test_one_class(self, c):
        self._failed = "No error message set or cleared."
        obj = c()
        e1 = threading.Event()
        e2 = threading.Event()

        def f1():
            obj.x = 'foo'
            obj.y = 'bar'
            del obj.y
            e1.set()
            e2.wait()

        def f2():
            try:
                foo = obj.x
            except AttributeError:
                # This is expected -- we haven't set obj.x in this thread yet!
                self._failed = ""  # passed
            else:
                self._failed = ('Incorrectly got value %r from class %r\n' %
                                (foo, c))
                sys.stderr.write(self._failed)

        t1 = threading.Thread(target=f1)
        t1.start()
        e1.wait()
        t2 = threading.Thread(target=f2)
        t2.start()
        t2.join()
        # The test is done; just let t1 know it can exit, and wait for it.
        e2.set()
        t1.join()

        self.assertFalse(self._failed, self._failed)

    def test_threading_local(self):
        self._test_one_class(self._local)

    def test_threading_local_subclass(self):
        class LocalSubclass(self._local):
            """To test that subclasses behave properly."""
        self._test_one_class(LocalSubclass)

    def _test_dict_attribute(self, cls):
        obj = cls()
        obj.x = 5
        self.assertEqual(obj.__dict__, {'x': 5})
        with self.assertRaises(AttributeError):
            obj.__dict__ = {}
        with self.assertRaises(AttributeError):
            del obj.__dict__

    def test_dict_attribute(self):
        self._test_dict_attribute(self._local)

    def test_dict_attribute_subclass(self):
        class LocalSubclass(self._local):
            """To test that subclasses behave properly."""
        self._test_dict_attribute(LocalSubclass)

182 183 184 185 186 187 188 189 190 191
    def test_cycle_collection(self):
        class X:
            pass

        x = X()
        x.local = self._local()
        x.local.x = x
        wr = weakref.ref(x)
        del x
        gc.collect()
192
        self.assertIsNone(wr())
193

194 195 196 197

class ThreadLocalTest(unittest.TestCase, BaseLocalTest):
    _local = _thread._local

198 199
class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest):
    _local = _threading_local.local
200

201

202
def test_main():
203 204
    suite = unittest.TestSuite()
    suite.addTest(DocTestSuite('_threading_local'))
205 206
    suite.addTest(unittest.makeSuite(ThreadLocalTest))
    suite.addTest(unittest.makeSuite(PyThreadingLocalTest))
207

208 209 210 211 212 213 214 215
    local_orig = _threading_local.local
    def setUp(test):
        _threading_local.local = _thread._local
    def tearDown(test):
        _threading_local.local = local_orig
    suite.addTest(DocTestSuite('_threading_local',
                               setUp=setUp, tearDown=tearDown)
                  )
216

217
    support.run_unittest(suite)
218 219 220

if __name__ == '__main__':
    test_main()