Unverified Kaydet (Commit) 08c2ba07 authored tarafından Victor Stinner's avatar Victor Stinner Kaydeden (comit) GitHub

bpo-35477: multiprocessing.Pool.__enter__() fails if called twice (GH-11134)

multiprocessing.Pool.__enter__() now fails if the pool is not
running: "with pool:" fails if used more than once.
üst 502fe19b
...@@ -261,6 +261,10 @@ class Pool(object): ...@@ -261,6 +261,10 @@ class Pool(object):
self._quick_put = self._inqueue._writer.send self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv self._quick_get = self._outqueue._reader.recv
def _check_running(self):
if self._state != RUN:
raise ValueError("Pool not running")
def apply(self, func, args=(), kwds={}): def apply(self, func, args=(), kwds={}):
''' '''
Equivalent of `func(*args, **kwds)`. Equivalent of `func(*args, **kwds)`.
...@@ -306,8 +310,7 @@ class Pool(object): ...@@ -306,8 +310,7 @@ class Pool(object):
''' '''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
''' '''
if self._state != RUN: self._check_running()
raise ValueError("Pool not running")
if chunksize == 1: if chunksize == 1:
result = IMapIterator(self._cache) result = IMapIterator(self._cache)
self._taskqueue.put( self._taskqueue.put(
...@@ -336,8 +339,7 @@ class Pool(object): ...@@ -336,8 +339,7 @@ class Pool(object):
''' '''
Like `imap()` method but ordering of results is arbitrary. Like `imap()` method but ordering of results is arbitrary.
''' '''
if self._state != RUN: self._check_running()
raise ValueError("Pool not running")
if chunksize == 1: if chunksize == 1:
result = IMapUnorderedIterator(self._cache) result = IMapUnorderedIterator(self._cache)
self._taskqueue.put( self._taskqueue.put(
...@@ -366,8 +368,7 @@ class Pool(object): ...@@ -366,8 +368,7 @@ class Pool(object):
''' '''
Asynchronous version of `apply()` method. Asynchronous version of `apply()` method.
''' '''
if self._state != RUN: self._check_running()
raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback) result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result return result
...@@ -385,8 +386,7 @@ class Pool(object): ...@@ -385,8 +386,7 @@ class Pool(object):
''' '''
Helper function to implement map, starmap and their async counterparts. Helper function to implement map, starmap and their async counterparts.
''' '''
if self._state != RUN: self._check_running()
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'): if not hasattr(iterable, '__len__'):
iterable = list(iterable) iterable = list(iterable)
...@@ -625,6 +625,7 @@ class Pool(object): ...@@ -625,6 +625,7 @@ class Pool(object):
p.join() p.join()
def __enter__(self): def __enter__(self):
self._check_running()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
......
...@@ -2561,6 +2561,22 @@ class _TestPool(BaseTestCase): ...@@ -2561,6 +2561,22 @@ class _TestPool(BaseTestCase):
# they were released too. # they were released too.
self.assertEqual(CountedObject.n_instances, 0) self.assertEqual(CountedObject.n_instances, 0)
def test_enter(self):
if self.TYPE == 'manager':
self.skipTest("test not applicable to manager")
pool = self.Pool(1)
with pool:
pass
# call pool.terminate()
# pool is no longer running
with self.assertRaises(ValueError):
# bpo-35477: pool.__enter__() fails if the pool is not running
with pool:
pass
pool.join()
def raising(): def raising():
raise KeyError("key") raise KeyError("key")
......
:meth:`multiprocessing.Pool.__enter__` now fails if the pool is not running:
``with pool:`` fails if used more than once.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment