Kaydet (Commit) 7152f6d9 authored tarafından Jesse Noller's avatar Jesse Noller

Add custom initializer argument to multiprocess.Manager*, courtesy of lekma

üst d56bab47
...@@ -1130,9 +1130,10 @@ their parent process exits. The manager classes are defined in the ...@@ -1130,9 +1130,10 @@ their parent process exits. The manager classes are defined in the
``current_process().authkey``. Otherwise *authkey* is used and it ``current_process().authkey``. Otherwise *authkey* is used and it
must be a string. must be a string.
.. method:: start() .. method:: start([initializer[, initargs]])
Start a subprocess to start the manager. Start a subprocess to start the manager. If *initializer* is not ``None``
then the subprocess will call ``initializer(*initargs)`` when it starts.
.. method:: serve_forever() .. method:: serve_forever()
......
...@@ -475,12 +475,15 @@ class BaseManager(object): ...@@ -475,12 +475,15 @@ class BaseManager(object):
dispatch(conn, None, 'dummy') dispatch(conn, None, 'dummy')
self._state.value = State.STARTED self._state.value = State.STARTED
def start(self): def start(self, initializer=None, initargs=()):
''' '''
Spawn a server process for this manager object Spawn a server process for this manager object
''' '''
assert self._state.value == State.INITIAL assert self._state.value == State.INITIAL
if initializer is not None and not hasattr(initializer, '__call__'):
raise TypeError('initializer must be a callable')
# pipe over which we will retrieve address of server # pipe over which we will retrieve address of server
reader, writer = connection.Pipe(duplex=False) reader, writer = connection.Pipe(duplex=False)
...@@ -488,7 +491,7 @@ class BaseManager(object): ...@@ -488,7 +491,7 @@ class BaseManager(object):
self._process = Process( self._process = Process(
target=type(self)._run_server, target=type(self)._run_server,
args=(self._registry, self._address, self._authkey, args=(self._registry, self._address, self._authkey,
self._serializer, writer), self._serializer, writer, initializer, initargs),
) )
ident = ':'.join(str(i) for i in self._process._identity) ident = ':'.join(str(i) for i in self._process._identity)
self._process.name = type(self).__name__ + '-' + ident self._process.name = type(self).__name__ + '-' + ident
...@@ -509,10 +512,14 @@ class BaseManager(object): ...@@ -509,10 +512,14 @@ class BaseManager(object):
) )
@classmethod @classmethod
def _run_server(cls, registry, address, authkey, serializer, writer): def _run_server(cls, registry, address, authkey, serializer, writer,
initializer=None, initargs=()):
''' '''
Create a server, report its address and run it Create a server, report its address and run it
''' '''
if initializer is not None:
initializer(*initargs)
# create server # create server
server = cls._Server(registry, address, authkey, serializer) server = cls._Server(registry, address, authkey, serializer)
......
...@@ -92,6 +92,9 @@ class Pool(object): ...@@ -92,6 +92,9 @@ class Pool(object):
except NotImplementedError: except NotImplementedError:
processes = 1 processes = 1
if initializer is not None and not hasattr(initializer, '__call__'):
raise TypeError('initializer must be a callable')
self._pool = [] self._pool = []
for i in range(processes): for i in range(processes):
w = self.Process( w = self.Process(
......
...@@ -1831,7 +1831,37 @@ class OtherTest(unittest.TestCase): ...@@ -1831,7 +1831,37 @@ class OtherTest(unittest.TestCase):
multiprocessing.connection.answer_challenge, multiprocessing.connection.answer_challenge,
_FakeConnection(), b'abc') _FakeConnection(), b'abc')
testcases_other = [OtherTest, TestInvalidHandle] #
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
def initializer(ns):
ns.test += 1
class TestInitializers(unittest.TestCase):
def setUp(self):
self.mgr = multiprocessing.Manager()
self.ns = self.mgr.Namespace()
self.ns.test = 0
def tearDown(self):
self.mgr.shutdown()
def test_manager_initializer(self):
m = multiprocessing.managers.SyncManager()
self.assertRaises(TypeError, m.start, 1)
m.start(initializer, (self.ns,))
self.assertEqual(self.ns.test, 1)
m.shutdown()
def test_pool_initializer(self):
self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
p = multiprocessing.Pool(1, initializer, (self.ns,))
p.close()
p.join()
self.assertEqual(self.ns.test, 1)
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers]
# #
# #
......
...@@ -202,6 +202,9 @@ Core and Builtins ...@@ -202,6 +202,9 @@ Core and Builtins
Library Library
------- -------
- Issue 5585: Add the ability to call an initializer to mulitiprocessing.manager
so that users can install custonm handlers/etc.
- Issue 3551: Patch multiprocessing to raise a proper exception if the size of the - Issue 3551: Patch multiprocessing to raise a proper exception if the size of the
object when writefile is called causes a ERROR_NO_SYSTEM_RESOURCES. Added docs object when writefile is called causes a ERROR_NO_SYSTEM_RESOURCES. Added docs
to note the limitation to note the limitation
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment