Kaydet (Commit) d69cfe88 authored tarafından Richard Oudkerk's avatar Richard Oudkerk

Issue #15064: Implement context manager protocol for multiprocessing types

üst 0f884273
...@@ -834,6 +834,10 @@ Connection objects are usually created using :func:`Pipe` -- see also ...@@ -834,6 +834,10 @@ Connection objects are usually created using :func:`Pipe` -- see also
Connection objects themselves can now be transferred between processes Connection objects themselves can now be transferred between processes
using :meth:`Connection.send` and :meth:`Connection.recv`. using :meth:`Connection.send` and :meth:`Connection.recv`.
.. versionadded:: 3.3
Connection objects now support the context manager protocol -- see
:ref:`typecontextmanager`. :meth:`__enter__` returns the
connection object, and :meth:`__exit__` calls :meth:`close`.
For example: For example:
...@@ -1277,6 +1281,9 @@ their parent process exits. The manager classes are defined in the ...@@ -1277,6 +1281,9 @@ their parent process exits. The manager classes are defined in the
The address used by the manager. The address used by the manager.
Manager objects support the context manager protocol -- see
:ref:`typecontextmanager`. :meth:`__enter__` returns the
manager object, and :meth:`__exit__` calls :meth:`shutdown`.
.. class:: SyncManager .. class:: SyncManager
...@@ -1747,6 +1754,11 @@ with the :class:`Pool` class. ...@@ -1747,6 +1754,11 @@ with the :class:`Pool` class.
Wait for the worker processes to exit. One must call :meth:`close` or Wait for the worker processes to exit. One must call :meth:`close` or
:meth:`terminate` before using :meth:`join`. :meth:`terminate` before using :meth:`join`.
.. versionadded:: 3.3
Pool objects now support the context manager protocol -- see
:ref:`typecontextmanager`. :meth:`__enter__` returns the pool
object, and :meth:`__exit__` calls :meth:`terminate`.
.. class:: AsyncResult .. class:: AsyncResult
...@@ -1911,6 +1923,11 @@ multiple connections at the same time. ...@@ -1911,6 +1923,11 @@ multiple connections at the same time.
The address from which the last accepted connection came. If this is The address from which the last accepted connection came. If this is
unavailable then it is ``None``. unavailable then it is ``None``.
.. versionadded:: 3.3
Listener objects now support the context manager protocol -- see
:ref:`typecontextmanager`. :meth:`__enter__` returns the
listener object, and :meth:`__exit__` calls :meth:`close`.
.. function:: wait(object_list, timeout=None) .. function:: wait(object_list, timeout=None)
Wait till an object in *object_list* is ready. Returns the list of Wait till an object in *object_list* is ready. Returns the list of
......
...@@ -257,6 +257,12 @@ class _ConnectionBase: ...@@ -257,6 +257,12 @@ class _ConnectionBase:
self._check_readable() self._check_readable()
return self._poll(timeout) return self._poll(timeout)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
if _winapi: if _winapi:
...@@ -436,6 +442,8 @@ class Listener(object): ...@@ -436,6 +442,8 @@ class Listener(object):
Returns a `Connection` object. Returns a `Connection` object.
''' '''
if self._listener is None:
raise IOError('listener is closed')
c = self._listener.accept() c = self._listener.accept()
if self._authkey: if self._authkey:
deliver_challenge(c, self._authkey) deliver_challenge(c, self._authkey)
...@@ -446,11 +454,19 @@ class Listener(object): ...@@ -446,11 +454,19 @@ class Listener(object):
''' '''
Close the bound socket or named pipe of `self`. Close the bound socket or named pipe of `self`.
''' '''
return self._listener.close() if self._listener is not None:
self._listener.close()
self._listener = None
address = property(lambda self: self._listener._address) address = property(lambda self: self._listener._address)
last_accepted = property(lambda self: self._listener._last_accepted) last_accepted = property(lambda self: self._listener._last_accepted)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
def Client(address, family=None, authkey=None): def Client(address, family=None, authkey=None):
''' '''
......
...@@ -53,6 +53,12 @@ class Listener(object): ...@@ -53,6 +53,12 @@ class Listener(object):
address = property(lambda self: self._backlog_queue) address = property(lambda self: self._backlog_queue)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
def Client(address): def Client(address):
_in, _out = Queue(), Queue() _in, _out = Queue(), Queue()
...@@ -85,3 +91,9 @@ class Connection(object): ...@@ -85,3 +91,9 @@ class Connection(object):
def close(self): def close(self):
pass pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
...@@ -522,6 +522,12 @@ class Pool(object): ...@@ -522,6 +522,12 @@ class Pool(object):
debug('cleaning up worker %d' % p.pid) debug('cleaning up worker %d' % p.pid)
p.join() p.join()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
# #
# Class whose instances are returned by `Pool.apply_async()` # Class whose instances are returned by `Pool.apply_async()`
# #
......
...@@ -1719,6 +1719,15 @@ class _TestPool(BaseTestCase): ...@@ -1719,6 +1719,15 @@ class _TestPool(BaseTestCase):
p.close() p.close()
p.join() p.join()
def test_context(self):
if self.TYPE == 'processes':
L = list(range(10))
expected = [sqr(i) for i in L]
with multiprocessing.Pool(2) as p:
r = p.map_async(sqr, L)
self.assertEqual(r.get(), expected)
self.assertRaises(AssertionError, p.map_async, sqr, L)
def raising(): def raising():
raise KeyError("key") raise KeyError("key")
...@@ -2266,6 +2275,22 @@ class _TestConnection(BaseTestCase): ...@@ -2266,6 +2275,22 @@ class _TestConnection(BaseTestCase):
self.assertRaises(RuntimeError, reduction.recv_handle, conn) self.assertRaises(RuntimeError, reduction.recv_handle, conn)
p.join() p.join()
def test_context(self):
a, b = self.Pipe()
with a, b:
a.send(1729)
self.assertEqual(b.recv(), 1729)
if self.TYPE == 'processes':
self.assertFalse(a.closed)
self.assertFalse(b.closed)
if self.TYPE == 'processes':
self.assertTrue(a.closed)
self.assertTrue(b.closed)
self.assertRaises(IOError, a.recv)
self.assertRaises(IOError, b.recv)
class _TestListener(BaseTestCase): class _TestListener(BaseTestCase):
ALLOWED_TYPES = ('processes',) ALLOWED_TYPES = ('processes',)
...@@ -2277,6 +2302,16 @@ class _TestListener(BaseTestCase): ...@@ -2277,6 +2302,16 @@ class _TestListener(BaseTestCase):
self.assertRaises(OSError, self.connection.Listener, self.assertRaises(OSError, self.connection.Listener,
l.address, family) l.address, family)
def test_context(self):
with self.connection.Listener() as l:
with self.connection.Client(l.address) as c:
with l.accept() as d:
c.send(1729)
self.assertEqual(d.recv(), 1729)
if self.TYPE == 'processes':
self.assertRaises(IOError, l.accept)
class _TestListenerClient(BaseTestCase): class _TestListenerClient(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads') ALLOWED_TYPES = ('processes', 'threads')
......
...@@ -29,6 +29,8 @@ Core and Builtins ...@@ -29,6 +29,8 @@ Core and Builtins
Library Library
------- -------
- Issue #15064: Implement context manager protocol for multiprocessing types
- Issue #15101: Make pool finalizer avoid joining current thread. - Issue #15101: Make pool finalizer avoid joining current thread.
- Issue #14657: The frozen instance of importlib used for bootstrap is now - Issue #14657: The frozen instance of importlib used for bootstrap is now
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment