Kaydet (Commit) 78f55ffc authored tarafından Charles-François Natali's avatar Charles-François Natali

Issue #23992: multiprocessing: make MapResult not fail-fast upon exception.

üst eaf8ebc1
...@@ -638,22 +638,26 @@ class MapResult(ApplyResult): ...@@ -638,22 +638,26 @@ class MapResult(ApplyResult):
self._number_left = length//chunksize + bool(length % chunksize) self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result): def _set(self, i, success_result):
self._number_left -= 1
success, result = success_result success, result = success_result
if success: if success and self._success:
self._value[i*self._chunksize:(i+1)*self._chunksize] = result self._value[i*self._chunksize:(i+1)*self._chunksize] = result
self._number_left -= 1
if self._number_left == 0: if self._number_left == 0:
if self._callback: if self._callback:
self._callback(self._value) self._callback(self._value)
del self._cache[self._job] del self._cache[self._job]
self._event.set() self._event.set()
else: else:
self._success = False if not success and self._success:
self._value = result # only store first exception
if self._error_callback: self._success = False
self._error_callback(self._value) self._value = result
del self._cache[self._job] if self._number_left == 0:
self._event.set() # only consider the result ready once all jobs are done
if self._error_callback:
self._error_callback(self._value)
del self._cache[self._job]
self._event.set()
# #
# Class whose instances are returned by `Pool.imap()` # Class whose instances are returned by `Pool.imap()`
......
...@@ -1660,6 +1660,10 @@ def sqr(x, wait=0.0): ...@@ -1660,6 +1660,10 @@ def sqr(x, wait=0.0):
def mul(x, y): def mul(x, y):
return x*y return x*y
def raise_large_valuerror(wait):
time.sleep(wait)
raise ValueError("x" * 1024**2)
class SayWhenError(ValueError): pass class SayWhenError(ValueError): pass
def exception_throwing_generator(total, when): def exception_throwing_generator(total, when):
...@@ -1895,6 +1899,26 @@ class _TestPool(BaseTestCase): ...@@ -1895,6 +1899,26 @@ class _TestPool(BaseTestCase):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
p.apply(self._test_wrapped_exception) p.apply(self._test_wrapped_exception)
def test_map_no_failfast(self):
# Issue #23992: the fail-fast behaviour when an exception is raised
# during map() would make Pool.join() deadlock, because a worker
# process would fill the result queue (after the result handler thread
# terminated, hence not draining it anymore).
t_start = time.time()
with self.assertRaises(ValueError):
with self.Pool(2) as p:
try:
p.map(raise_large_valuerror, [0, 1])
finally:
time.sleep(0.5)
p.close()
p.join()
# check that we indeed waited for all jobs
self.assertGreater(time.time() - t_start, 0.9)
def raising(): def raising():
raise KeyError("key") raise KeyError("key")
......
...@@ -179,6 +179,8 @@ Core and Builtins ...@@ -179,6 +179,8 @@ Core and Builtins
Library Library
------- -------
- Issue #23992: multiprocessing: make MapResult not fail-fast upon exception.
- Issue #26243: Support keyword arguments to zlib.compress(). Patch by Aviv - Issue #26243: Support keyword arguments to zlib.compress(). Patch by Aviv
Palivoda. Palivoda.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment