Kaydet (Commit) fc7b0ec5 authored tarafından Richard Oudkerk's avatar Richard Oudkerk

Use setUpClass() and tearDownClass() in test_multiprocessing.

Each manager test class now uses a separate manager.  Also, process
pools are no longer created before starting any tests.

Note that warnings are written if the manager for a test case still
has live objects when it is shutdown.  This is true for a few test cases
which fail to wait for all child processes to end.
üst 5046e974
...@@ -19,6 +19,7 @@ import socket ...@@ -19,6 +19,7 @@ import socket
import random import random
import logging import logging
import struct import struct
import operator
import test.support import test.support
import test.script_helper import test.script_helper
...@@ -1617,6 +1618,18 @@ def mul(x, y): ...@@ -1617,6 +1618,18 @@ def mul(x, y):
class _TestPool(BaseTestCase): class _TestPool(BaseTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.pool = cls.Pool(4)
@classmethod
def tearDownClass(cls):
cls.pool.terminate()
cls.pool.join()
cls.pool = None
super().tearDownClass()
def test_apply(self): def test_apply(self):
papply = self.pool.apply papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5)) self.assertEqual(papply(sqr, (5,)), sqr(5))
...@@ -1691,15 +1704,6 @@ class _TestPool(BaseTestCase): ...@@ -1691,15 +1704,6 @@ class _TestPool(BaseTestCase):
p.join() p.join()
def test_terminate(self): def test_terminate(self):
if self.TYPE == 'manager':
# On Unix a forked process increfs each shared object to
# which its parent process held a reference. If the
# forked process gets terminated then there is likely to
# be a reference leak. So to prevent
# _TestZZZNumberOfObjects from failing we skip this test
# when using a manager.
return
result = self.pool.map_async( result = self.pool.map_async(
time.sleep, [0.1 for i in range(10000)], chunksize=1 time.sleep, [0.1 for i in range(10000)], chunksize=1
) )
...@@ -1821,35 +1825,6 @@ class _TestPoolWorkerLifetime(BaseTestCase): ...@@ -1821,35 +1825,6 @@ class _TestPoolWorkerLifetime(BaseTestCase):
for (j, res) in enumerate(results): for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j)) self.assertEqual(res.get(), sqr(j))
#
# Test that manager has expected number of shared objects left
#
class _TestZZZNumberOfObjects(BaseTestCase):
# Because test cases are sorted alphabetically, this one will get
# run after all the other tests for the manager. It tests that
# there have been no "reference leaks" for the manager's shared
# objects. Note the comment in _TestPool.test_terminate().
# If some other test using ManagerMixin.manager fails, then the
# raised exception may keep alive a frame which holds a reference
# to a managed object. This will cause test_number_of_objects to
# also fail.
ALLOWED_TYPES = ('manager',)
def test_number_of_objects(self):
EXPECTED_NUMBER = 1 # the pool object is still alive
multiprocessing.active_children() # discard dead process objs
gc.collect() # do garbage collection
refs = self.manager._number_of_objects()
debug_info = self.manager._debug_info()
if refs != EXPECTED_NUMBER:
print(self.manager._debug_info())
print(debug_info)
self.assertEqual(refs, EXPECTED_NUMBER)
# #
# Test of creating a customized manager class # Test of creating a customized manager class
# #
...@@ -2886,15 +2861,6 @@ class TestInvalidHandle(unittest.TestCase): ...@@ -2886,15 +2861,6 @@ class TestInvalidHandle(unittest.TestCase):
# Functions used to create test cases from the base ones in this module # Functions used to create test cases from the base ones in this module
# #
def get_attributes(Source, names):
d = {}
for name in names:
obj = getattr(Source, name)
if type(obj) == type(get_attributes):
obj = staticmethod(obj)
d[name] = obj
return d
def create_test_cases(Mixin, type): def create_test_cases(Mixin, type):
result = {} result = {}
glob = globals() glob = globals()
...@@ -2907,10 +2873,10 @@ def create_test_cases(Mixin, type): ...@@ -2907,10 +2873,10 @@ def create_test_cases(Mixin, type):
assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES) assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
if type in base.ALLOWED_TYPES: if type in base.ALLOWED_TYPES:
newname = 'With' + Type + name[1:] newname = 'With' + Type + name[1:]
class Temp(base, unittest.TestCase, Mixin): class Temp(base, Mixin, unittest.TestCase):
pass pass
result[newname] = Temp result[newname] = Temp
Temp.__name__ = newname Temp.__name__ = Temp.__qualname__ = newname
Temp.__module__ = Mixin.__module__ Temp.__module__ = Mixin.__module__
return result return result
...@@ -2921,12 +2887,24 @@ def create_test_cases(Mixin, type): ...@@ -2921,12 +2887,24 @@ def create_test_cases(Mixin, type):
class ProcessesMixin(object): class ProcessesMixin(object):
TYPE = 'processes' TYPE = 'processes'
Process = multiprocessing.Process Process = multiprocessing.Process
locals().update(get_attributes(multiprocessing, ( connection = multiprocessing.connection
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', current_process = staticmethod(multiprocessing.current_process)
'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue', active_children = staticmethod(multiprocessing.active_children)
'RawArray', 'current_process', 'active_children', 'Pipe', Pool = staticmethod(multiprocessing.Pool)
'connection', 'JoinableQueue', 'Pool' Pipe = staticmethod(multiprocessing.Pipe)
))) Queue = staticmethod(multiprocessing.Queue)
JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
Lock = staticmethod(multiprocessing.Lock)
RLock = staticmethod(multiprocessing.RLock)
Semaphore = staticmethod(multiprocessing.Semaphore)
BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
Condition = staticmethod(multiprocessing.Condition)
Event = staticmethod(multiprocessing.Event)
Barrier = staticmethod(multiprocessing.Barrier)
Value = staticmethod(multiprocessing.Value)
Array = staticmethod(multiprocessing.Array)
RawValue = staticmethod(multiprocessing.RawValue)
RawArray = staticmethod(multiprocessing.RawArray)
testcases_processes = create_test_cases(ProcessesMixin, type='processes') testcases_processes = create_test_cases(ProcessesMixin, type='processes')
globals().update(testcases_processes) globals().update(testcases_processes)
...@@ -2935,12 +2913,42 @@ globals().update(testcases_processes) ...@@ -2935,12 +2913,42 @@ globals().update(testcases_processes)
class ManagerMixin(object): class ManagerMixin(object):
TYPE = 'manager' TYPE = 'manager'
Process = multiprocessing.Process Process = multiprocessing.Process
manager = object.__new__(multiprocessing.managers.SyncManager) Queue = property(operator.attrgetter('manager.Queue'))
locals().update(get_attributes(manager, ( JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', Lock = property(operator.attrgetter('manager.Lock'))
'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict', RLock = property(operator.attrgetter('manager.RLock'))
'Namespace', 'JoinableQueue', 'Pool' Semaphore = property(operator.attrgetter('manager.Semaphore'))
))) BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
Condition = property(operator.attrgetter('manager.Condition'))
Event = property(operator.attrgetter('manager.Event'))
Barrier = property(operator.attrgetter('manager.Barrier'))
Value = property(operator.attrgetter('manager.Value'))
Array = property(operator.attrgetter('manager.Array'))
list = property(operator.attrgetter('manager.list'))
dict = property(operator.attrgetter('manager.dict'))
Namespace = property(operator.attrgetter('manager.Namespace'))
@classmethod
def Pool(cls, *args, **kwds):
return cls.manager.Pool(*args, **kwds)
@classmethod
def setUpClass(cls):
cls.manager = multiprocessing.Manager()
@classmethod
def tearDownClass(cls):
multiprocessing.active_children() # discard dead process objs
gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
# This is not really an error since some tests do not
# ensure that all processes which hold a reference to a
# managed object have been joined.
print('Shared objects which still exist at manager shutdown:')
print(cls.manager._debug_info())
cls.manager.shutdown()
cls.manager.join()
cls.manager = None
testcases_manager = create_test_cases(ManagerMixin, type='manager') testcases_manager = create_test_cases(ManagerMixin, type='manager')
globals().update(testcases_manager) globals().update(testcases_manager)
...@@ -2949,16 +2957,27 @@ globals().update(testcases_manager) ...@@ -2949,16 +2957,27 @@ globals().update(testcases_manager)
class ThreadsMixin(object): class ThreadsMixin(object):
TYPE = 'threads' TYPE = 'threads'
Process = multiprocessing.dummy.Process Process = multiprocessing.dummy.Process
locals().update(get_attributes(multiprocessing.dummy, ( connection = multiprocessing.dummy.connection
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', current_process = staticmethod(multiprocessing.dummy.current_process)
'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process', active_children = staticmethod(multiprocessing.dummy.active_children)
'active_children', 'Pipe', 'connection', 'dict', 'list', Pool = staticmethod(multiprocessing.Pool)
'Namespace', 'JoinableQueue', 'Pool' Pipe = staticmethod(multiprocessing.dummy.Pipe)
))) Queue = staticmethod(multiprocessing.dummy.Queue)
JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
Lock = staticmethod(multiprocessing.dummy.Lock)
RLock = staticmethod(multiprocessing.dummy.RLock)
Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
Condition = staticmethod(multiprocessing.dummy.Condition)
Event = staticmethod(multiprocessing.dummy.Event)
Barrier = staticmethod(multiprocessing.dummy.Barrier)
Value = staticmethod(multiprocessing.dummy.Value)
Array = staticmethod(multiprocessing.dummy.Array)
testcases_threads = create_test_cases(ThreadsMixin, type='threads') testcases_threads = create_test_cases(ThreadsMixin, type='threads')
globals().update(testcases_threads) globals().update(testcases_threads)
class OtherTest(unittest.TestCase): class OtherTest(unittest.TestCase):
# TODO: add more tests for deliver/answer challenge. # TODO: add more tests for deliver/answer challenge.
def test_deliver_challenge_auth_failure(self): def test_deliver_challenge_auth_failure(self):
...@@ -3390,12 +3409,6 @@ def test_main(run=None): ...@@ -3390,12 +3409,6 @@ def test_main(run=None):
multiprocessing.get_logger().setLevel(LOG_LEVEL) multiprocessing.get_logger().setLevel(LOG_LEVEL)
ProcessesMixin.pool = multiprocessing.Pool(4)
ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
ManagerMixin.manager.__init__()
ManagerMixin.manager.start()
ManagerMixin.pool = ManagerMixin.manager.Pool(4)
testcases = ( testcases = (
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
...@@ -3405,18 +3418,7 @@ def test_main(run=None): ...@@ -3405,18 +3418,7 @@ def test_main(run=None):
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
try: run(suite)
run(suite)
finally:
ThreadsMixin.pool.terminate()
ProcessesMixin.pool.terminate()
ManagerMixin.pool.terminate()
ManagerMixin.pool.join()
ManagerMixin.manager.shutdown()
ManagerMixin.manager.join()
ThreadsMixin.pool.join()
ProcessesMixin.pool.join()
del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
def main(): def main():
test_main(unittest.TextTestRunner(verbosity=2).run) test_main(unittest.TextTestRunner(verbosity=2).run)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment