Kaydet (Commit) 949d47dc authored tarafından Amaury Forgeot d'Arc's avatar Amaury Forgeot d'Arc

Issue #3125: Remove copy_reg in multiprocessing and replace it with

ForkingPickler.register() to resolve conflict with ctypes.
üst 3ad89100
...@@ -12,7 +12,7 @@ import signal ...@@ -12,7 +12,7 @@ import signal
from multiprocessing import util, process from multiprocessing import util, process
__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close'] __all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
# #
# Check that the current thread is spawning a child process # Check that the current thread is spawning a child process
...@@ -25,6 +25,50 @@ def assert_spawning(self): ...@@ -25,6 +25,50 @@ def assert_spawning(self):
' through inheritance' % type(self).__name__ ' through inheritance' % type(self).__name__
) )
#
# Try making some callable types picklable
#
from pickle import _Pickler as Pickler
class ForkingPickler(Pickler):
dispatch = Pickler.dispatch.copy()
@classmethod
def register(cls, type, reduce):
def dispatcher(self, obj):
rv = reduce(obj)
if isinstance(rv, str):
self.save_global(obj, rv)
else:
self.save_reduce(obj=obj, *rv)
cls.dispatch[type] = dispatcher
def _reduce_method(m):
if m.__self__ is None:
return getattr, (m.__class__, m.__func__.__name__)
else:
return getattr, (m.__self__, m.__func__.__name__)
class _C:
def f(self):
pass
ForkingPickler.register(type(_C().f), _reduce_method)
def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__)
ForkingPickler.register(type(list.append), _reduce_method_descriptor)
ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
try:
from functools import partial
except ImportError:
pass
else:
def _reduce_partial(p):
return _rebuild_partial, (p.func, p.args, p.keywords or {})
def _rebuild_partial(func, args, keywords):
return partial(func, *args, **keywords)
ForkingPickler.register(partial, _reduce_partial)
# #
# Unix # Unix
# #
...@@ -105,16 +149,18 @@ else: ...@@ -105,16 +149,18 @@ else:
import _thread import _thread
import msvcrt import msvcrt
import _subprocess import _subprocess
import copyreg
import time import time
from ._multiprocessing import win32, Connection, PipeConnection from ._multiprocessing import win32, Connection, PipeConnection
from .util import Finalize from .util import Finalize
try: #try:
from cPickle import dump, load, HIGHEST_PROTOCOL # from cPickle import dump, load, HIGHEST_PROTOCOL
except ImportError: #except ImportError:
from pickle import dump, load, HIGHEST_PROTOCOL from pickle import load, HIGHEST_PROTOCOL
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
# #
# #
...@@ -346,9 +392,8 @@ else: ...@@ -346,9 +392,8 @@ else:
return type(conn), (Popen.duplicate_for_child(conn.fileno()), return type(conn), (Popen.duplicate_for_child(conn.fileno()),
conn.readable, conn.writable) conn.readable, conn.writable)
copyreg.pickle(Connection, reduce_connection) ForkingPickler.register(Connection, reduce_connection)
copyreg.pickle(PipeConnection, reduce_connection) ForkingPickler.register(PipeConnection, reduce_connection)
# #
# Prepare current process # Prepare current process
......
...@@ -18,13 +18,12 @@ import sys ...@@ -18,13 +18,12 @@ import sys
import weakref import weakref
import threading import threading
import array import array
import copyreg
import queue import queue
from traceback import format_exc from traceback import format_exc
from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing import Process, current_process, active_children, Pool, util, connection
from multiprocessing.process import AuthenticationString from multiprocessing.process import AuthenticationString
from multiprocessing.forking import exit, Popen, assert_spawning from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
from multiprocessing.util import Finalize, info from multiprocessing.util import Finalize, info
try: try:
...@@ -38,14 +37,14 @@ except ImportError: ...@@ -38,14 +37,14 @@ except ImportError:
def reduce_array(a): def reduce_array(a):
return array.array, (a.typecode, a.tostring()) return array.array, (a.typecode, a.tostring())
copyreg.pickle(array.array, reduce_array) ForkingPickler.register(array.array, reduce_array)
view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
if view_types[0] is not list: # only needed in Py3.0 if view_types[0] is not list: # only needed in Py3.0
def rebuild_as_list(obj): def rebuild_as_list(obj):
return list, (list(obj),) return list, (list(obj),)
for view_type in view_types: for view_type in view_types:
copyreg.pickle(view_type, rebuild_as_list) ForkingPickler.register(view_type, rebuild_as_list)
# #
# Type for identifying shared objects # Type for identifying shared objects
......
...@@ -13,11 +13,10 @@ import os ...@@ -13,11 +13,10 @@ import os
import sys import sys
import socket import socket
import threading import threading
import copyreg
import _multiprocessing import _multiprocessing
from multiprocessing import current_process from multiprocessing import current_process
from multiprocessing.forking import Popen, duplicate, close from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug from multiprocessing.util import register_after_fork, debug, sub_debug
from multiprocessing.connection import Client, Listener from multiprocessing.connection import Client, Listener
...@@ -134,7 +133,7 @@ def rebuild_handle(pickled_data): ...@@ -134,7 +133,7 @@ def rebuild_handle(pickled_data):
return new_handle return new_handle
# #
# Register `_multiprocessing.Connection` with `copy_reg` # Register `_multiprocessing.Connection` with `ForkingPickler`
# #
def reduce_connection(conn): def reduce_connection(conn):
...@@ -147,10 +146,10 @@ def rebuild_connection(reduced_handle, readable, writable): ...@@ -147,10 +146,10 @@ def rebuild_connection(reduced_handle, readable, writable):
handle, readable=readable, writable=writable handle, readable=readable, writable=writable
) )
copyreg.pickle(_multiprocessing.Connection, reduce_connection) ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
# #
# Register `socket.socket` with `copy_reg` # Register `socket.socket` with `ForkingPickler`
# #
def fromfd(fd, family, type_, proto=0): def fromfd(fd, family, type_, proto=0):
...@@ -169,10 +168,10 @@ def rebuild_socket(reduced_handle, family, type_, proto): ...@@ -169,10 +168,10 @@ def rebuild_socket(reduced_handle, family, type_, proto):
close(fd) close(fd)
return _sock return _sock
copyreg.pickle(socket.socket, reduce_socket) ForkingPickler.register(socket.socket, reduce_socket)
# #
# Register `_multiprocessing.PipeConnection` with `copy_reg` # Register `_multiprocessing.PipeConnection` with `ForkingPickler`
# #
if sys.platform == 'win32': if sys.platform == 'win32':
...@@ -187,4 +186,4 @@ if sys.platform == 'win32': ...@@ -187,4 +186,4 @@ if sys.platform == 'win32':
handle, readable=readable, writable=writable handle, readable=readable, writable=writable
) )
copyreg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection) ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
...@@ -9,10 +9,9 @@ ...@@ -9,10 +9,9 @@
import sys import sys
import ctypes import ctypes
import weakref import weakref
import copyreg
from multiprocessing import heap, RLock from multiprocessing import heap, RLock
from multiprocessing.forking import assert_spawning from multiprocessing.forking import assert_spawning, ForkingPickler
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized'] __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
...@@ -124,8 +123,7 @@ def reduce_ctype(obj): ...@@ -124,8 +123,7 @@ def reduce_ctype(obj):
def rebuild_ctype(type_, wrapper, length): def rebuild_ctype(type_, wrapper, length):
if length is not None: if length is not None:
type_ = type_ * length type_ = type_ * length
if sys.platform == 'win32' and type_ not in copyreg.dispatch_table: ForkingPickler.register(type_, reduce_ctype)
copyreg.pickle(type_, reduce_ctype)
obj = type_.from_address(wrapper.get_address()) obj = type_.from_address(wrapper.get_address())
obj._wrapper = wrapper obj._wrapper = wrapper
return obj return obj
......
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
import itertools import itertools
import weakref import weakref
import copyreg
import atexit import atexit
import threading # we want threading to install it's import threading # we want threading to install it's
# cleanup function before multiprocessing does # cleanup function before multiprocessing does
...@@ -302,35 +301,3 @@ class ForkAwareLocal(threading.local): ...@@ -302,35 +301,3 @@ class ForkAwareLocal(threading.local):
register_after_fork(self, lambda obj : obj.__dict__.clear()) register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self): def __reduce__(self):
return type(self), () return type(self), ()
#
# Try making some callable types picklable
#
def _reduce_method(m):
if m.__self__ is None:
return getattr, (m.__self__.__class__, m.__func__.__name__)
else:
return getattr, (m.__self__, m.__func__.__name__)
copyreg.pickle(type(Finalize.__init__), _reduce_method)
def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__)
copyreg.pickle(type(list.append), _reduce_method_descriptor)
copyreg.pickle(type(int.__add__), _reduce_method_descriptor)
def _reduce_builtin_function_or_method(m):
return getattr, (m.__self__, m.__name__)
copyreg.pickle(type(list().append), _reduce_builtin_function_or_method)
copyreg.pickle(type(int().__add__), _reduce_builtin_function_or_method)
try:
from functools import partial
except ImportError:
pass
else:
def _reduce_partial(p):
return _rebuild_partial, (p.func, p.args, p.keywords or {})
def _rebuild_partial(func, args, keywords):
return partial(func, *args, **keywords)
copyreg.pickle(partial, _reduce_partial)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment