Kaydet (Commit) dfd79494 authored tarafından Benjamin Peterson's avatar Benjamin Peterson

convert multiprocessing to unix line endings

üst c9798fc7
...@@ -68,10 +68,10 @@ from multiprocessing.process import Process, current_process, active_children ...@@ -68,10 +68,10 @@ from multiprocessing.process import Process, current_process, active_children
class ProcessError(Exception): class ProcessError(Exception):
pass pass
class BufferTooShort(ProcessError): class BufferTooShort(ProcessError):
pass pass
class TimeoutError(ProcessError): class TimeoutError(ProcessError):
pass pass
...@@ -123,7 +123,7 @@ def cpu_count(): ...@@ -123,7 +123,7 @@ def cpu_count():
num = os.sysconf('SC_NPROCESSORS_ONLN') num = os.sysconf('SC_NPROCESSORS_ONLN')
except (ValueError, OSError, AttributeError): except (ValueError, OSError, AttributeError):
num = 0 num = 0
if num >= 1: if num >= 1:
return num return num
else: else:
...@@ -151,13 +151,13 @@ def log_to_stderr(level=None): ...@@ -151,13 +151,13 @@ def log_to_stderr(level=None):
''' '''
from multiprocessing.util import log_to_stderr from multiprocessing.util import log_to_stderr
return log_to_stderr(level) return log_to_stderr(level)
def allow_connection_pickling(): def allow_connection_pickling():
''' '''
Install support for sending connections and sockets between processes Install support for sending connections and sockets between processes
''' '''
from multiprocessing import reduction from multiprocessing import reduction
# #
# Definitions depending on native semaphores # Definitions depending on native semaphores
# #
...@@ -263,7 +263,7 @@ if sys.platform == 'win32': ...@@ -263,7 +263,7 @@ if sys.platform == 'win32':
''' '''
Sets the path to a python.exe or pythonw.exe binary used to run Sets the path to a python.exe or pythonw.exe binary used to run
child processes on Windows instead of sys.executable. child processes on Windows instead of sys.executable.
Useful for people embedding Python. Useful for people embedding Python.
''' '''
from multiprocessing.forking import set_executable from multiprocessing.forking import set_executable
set_executable(executable) set_executable(executable)
......
...@@ -50,7 +50,7 @@ def arbitrary_address(family): ...@@ -50,7 +50,7 @@ def arbitrary_address(family):
''' '''
if family == 'AF_INET': if family == 'AF_INET':
return ('localhost', 0) return ('localhost', 0)
elif family == 'AF_UNIX': elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
elif family == 'AF_PIPE': elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
...@@ -160,7 +160,7 @@ if sys.platform != 'win32': ...@@ -160,7 +160,7 @@ if sys.platform != 'win32':
c2 = _multiprocessing.Connection(fd2, readable=False) c2 = _multiprocessing.Connection(fd2, readable=False)
return c1, c2 return c1, c2
else: else:
from ._multiprocessing import win32 from ._multiprocessing import win32
...@@ -200,7 +200,7 @@ else: ...@@ -200,7 +200,7 @@ else:
c1 = _multiprocessing.PipeConnection(h1, writable=duplex) c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
c2 = _multiprocessing.PipeConnection(h2, readable=duplex) c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
return c1, c2 return c1, c2
# #
...@@ -290,14 +290,14 @@ if sys.platform == 'win32': ...@@ -290,14 +290,14 @@ if sys.platform == 'win32':
) )
self._handle_queue = [handle] self._handle_queue = [handle]
self._last_accepted = None self._last_accepted = None
sub_debug('listener created with address=%r', self._address) sub_debug('listener created with address=%r', self._address)
self.close = Finalize( self.close = Finalize(
self, PipeListener._finalize_pipe_listener, self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0 args=(self._handle_queue, self._address), exitpriority=0
) )
def accept(self): def accept(self):
newhandle = win32.CreateNamedPipe( newhandle = win32.CreateNamedPipe(
self._address, win32.PIPE_ACCESS_DUPLEX, self._address, win32.PIPE_ACCESS_DUPLEX,
...@@ -320,7 +320,7 @@ if sys.platform == 'win32': ...@@ -320,7 +320,7 @@ if sys.platform == 'win32':
sub_debug('closing listener with address=%r', address) sub_debug('closing listener with address=%r', address)
for handle in queue: for handle in queue:
close(handle) close(handle)
def PipeClient(address): def PipeClient(address):
''' '''
Return a connection object connected to the pipe given by `address` Return a connection object connected to the pipe given by `address`
...@@ -397,7 +397,7 @@ class ConnectionWrapper(object): ...@@ -397,7 +397,7 @@ class ConnectionWrapper(object):
self._loads = loads self._loads = loads
for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
obj = getattr(conn, attr) obj = getattr(conn, attr)
setattr(self, attr, obj) setattr(self, attr, obj)
def send(self, obj): def send(self, obj):
s = self._dumps(obj) s = self._dumps(obj)
self._conn.send_bytes(s) self._conn.send_bytes(s)
......
# #
# Support for the API of the multiprocessing package using threads # Support for the API of the multiprocessing package using threads
# #
# multiprocessing/dummy/__init__.py # multiprocessing/dummy/__init__.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
__all__ = [ __all__ = [
'Process', 'current_process', 'active_children', 'freeze_support', 'Process', 'current_process', 'active_children', 'freeze_support',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' 'Event', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue'
] ]
# #
# Imports # Imports
# #
import threading import threading
import sys import sys
import weakref import weakref
import array import array
import itertools import itertools
from multiprocessing import TimeoutError, cpu_count from multiprocessing import TimeoutError, cpu_count
from multiprocessing.dummy.connection import Pipe from multiprocessing.dummy.connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore from threading import Lock, RLock, Semaphore, BoundedSemaphore
from threading import Event from threading import Event
from Queue import Queue from Queue import Queue
# #
# #
# #
class DummyProcess(threading.Thread): class DummyProcess(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
threading.Thread.__init__(self, group, target, name, args, kwargs) threading.Thread.__init__(self, group, target, name, args, kwargs)
self._pid = None self._pid = None
self._children = weakref.WeakKeyDictionary() self._children = weakref.WeakKeyDictionary()
self._start_called = False self._start_called = False
self._parent = current_process() self._parent = current_process()
def start(self): def start(self):
assert self._parent is current_process() assert self._parent is current_process()
self._start_called = True self._start_called = True
self._parent._children[self] = None self._parent._children[self] = None
threading.Thread.start(self) threading.Thread.start(self)
def get_exitcode(self): def get_exitcode(self):
if self._start_called and not self.is_alive(): if self._start_called and not self.is_alive():
return 0 return 0
else: else:
return None return None
# XXX # XXX
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
is_alive = threading.Thread.is_alive.im_func is_alive = threading.Thread.is_alive.im_func
get_name = threading.Thread.get_name.im_func get_name = threading.Thread.get_name.im_func
set_name = threading.Thread.set_name.im_func set_name = threading.Thread.set_name.im_func
is_daemon = threading.Thread.is_daemon.im_func is_daemon = threading.Thread.is_daemon.im_func
set_daemon = threading.Thread.set_daemon.im_func set_daemon = threading.Thread.set_daemon.im_func
else: else:
is_alive = threading.Thread.is_alive is_alive = threading.Thread.is_alive
get_name = threading.Thread.get_name get_name = threading.Thread.get_name
set_name = threading.Thread.set_name set_name = threading.Thread.set_name
is_daemon = threading.Thread.is_daemon is_daemon = threading.Thread.is_daemon
set_daemon = threading.Thread.set_daemon set_daemon = threading.Thread.set_daemon
# #
# #
# #
class Condition(threading._Condition): class Condition(threading._Condition):
# XXX # XXX
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
notify_all = threading._Condition.notify_all.im_func notify_all = threading._Condition.notify_all.im_func
else: else:
notify_all = threading._Condition.notify_all notify_all = threading._Condition.notify_all
# #
# #
# #
Process = DummyProcess Process = DummyProcess
current_process = threading.current_thread current_process = threading.current_thread
current_process()._children = weakref.WeakKeyDictionary() current_process()._children = weakref.WeakKeyDictionary()
def active_children(): def active_children():
children = current_process()._children children = current_process()._children
for p in list(children): for p in list(children):
if not p.is_alive(): if not p.is_alive():
children.pop(p, None) children.pop(p, None)
return list(children) return list(children)
def freeze_support(): def freeze_support():
pass pass
# #
# #
# #
class Namespace(object): class Namespace(object):
def __init__(self, **kwds): def __init__(self, **kwds):
self.__dict__.update(kwds) self.__dict__.update(kwds)
def __repr__(self): def __repr__(self):
items = self.__dict__.items() items = self.__dict__.items()
temp = [] temp = []
for name, value in items: for name, value in items:
if not name.startswith('_'): if not name.startswith('_'):
temp.append('%s=%r' % (name, value)) temp.append('%s=%r' % (name, value))
temp.sort() temp.sort()
return 'Namespace(%s)' % str.join(', ', temp) return 'Namespace(%s)' % str.join(', ', temp)
dict = dict dict = dict
list = list list = list
def Array(typecode, sequence, lock=True): def Array(typecode, sequence, lock=True):
return array.array(typecode, sequence) return array.array(typecode, sequence)
class Value(object): class Value(object):
def __init__(self, typecode, value, lock=True): def __init__(self, typecode, value, lock=True):
self._typecode = typecode self._typecode = typecode
self._value = value self._value = value
def _get(self): def _get(self):
return self._value return self._value
def _set(self, value): def _set(self, value):
self._value = value self._value = value
value = property(_get, _set) value = property(_get, _set)
def __repr__(self): def __repr__(self):
return '<%r(%r, %r)>'%(type(self).__name__,self._typecode,self._value) return '<%r(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
def Manager(): def Manager():
return sys.modules[__name__] return sys.modules[__name__]
def shutdown(): def shutdown():
pass pass
def Pool(processes=None, initializer=None, initargs=()): def Pool(processes=None, initializer=None, initargs=()):
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
return ThreadPool(processes, initializer, initargs) return ThreadPool(processes, initializer, initargs)
JoinableQueue = Queue JoinableQueue = Queue
# #
# Analogue of `multiprocessing.connection` which uses queues instead of sockets # Analogue of `multiprocessing.connection` which uses queues instead of sockets
# #
# multiprocessing/dummy/connection.py # multiprocessing/dummy/connection.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
__all__ = [ 'Client', 'Listener', 'Pipe' ] __all__ = [ 'Client', 'Listener', 'Pipe' ]
from Queue import Queue from Queue import Queue
families = [None] families = [None]
class Listener(object): class Listener(object):
def __init__(self, address=None, family=None, backlog=1): def __init__(self, address=None, family=None, backlog=1):
self._backlog_queue = Queue(backlog) self._backlog_queue = Queue(backlog)
def accept(self): def accept(self):
return Connection(*self._backlog_queue.get()) return Connection(*self._backlog_queue.get())
def close(self): def close(self):
self._backlog_queue = None self._backlog_queue = None
address = property(lambda self: self._backlog_queue) address = property(lambda self: self._backlog_queue)
def Client(address): def Client(address):
_in, _out = Queue(), Queue() _in, _out = Queue(), Queue()
address.put((_out, _in)) address.put((_out, _in))
return Connection(_in, _out) return Connection(_in, _out)
def Pipe(duplex=True): def Pipe(duplex=True):
a, b = Queue(), Queue() a, b = Queue(), Queue()
return Connection(a, b), Connection(b, a) return Connection(a, b), Connection(b, a)
class Connection(object): class Connection(object):
def __init__(self, _in, _out): def __init__(self, _in, _out):
self._out = _out self._out = _out
self._in = _in self._in = _in
self.send = self.send_bytes = _out.put self.send = self.send_bytes = _out.put
self.recv = self.recv_bytes = _in.get self.recv = self.recv_bytes = _in.get
def poll(self, timeout=0.0): def poll(self, timeout=0.0):
if self._in.qsize() > 0: if self._in.qsize() > 0:
return True return True
if timeout <= 0.0: if timeout <= 0.0:
return False return False
self._in.not_empty.acquire() self._in.not_empty.acquire()
self._in.not_empty.wait(timeout) self._in.not_empty.wait(timeout)
self._in.not_empty.release() self._in.not_empty.release()
return self._in.qsize() > 0 return self._in.qsize() > 0
def close(self): def close(self):
pass pass
...@@ -92,7 +92,7 @@ if sys.platform != 'win32': ...@@ -92,7 +92,7 @@ if sys.platform != 'win32':
except OSError, e: except OSError, e:
if self.wait(timeout=0.1) is None: if self.wait(timeout=0.1) is None:
raise raise
@staticmethod @staticmethod
def thread_is_spawning(): def thread_is_spawning():
return False return False
...@@ -107,10 +107,10 @@ else: ...@@ -107,10 +107,10 @@ else:
import _subprocess import _subprocess
import copy_reg import copy_reg
import time import time
from ._multiprocessing import win32, Connection, PipeConnection from ._multiprocessing import win32, Connection, PipeConnection
from .util import Finalize from .util import Finalize
try: try:
from cPickle import dump, load, HIGHEST_PROTOCOL from cPickle import dump, load, HIGHEST_PROTOCOL
except ImportError: except ImportError:
...@@ -217,7 +217,7 @@ else: ...@@ -217,7 +217,7 @@ else:
if code == TERMINATE: if code == TERMINATE:
code = -signal.SIGTERM code = -signal.SIGTERM
self.returncode = code self.returncode = code
return self.returncode return self.returncode
def poll(self): def poll(self):
...@@ -230,7 +230,7 @@ else: ...@@ -230,7 +230,7 @@ else:
except WindowsError: except WindowsError:
if self.wait(timeout=0.1) is None: if self.wait(timeout=0.1) is None:
raise raise
# #
# #
# #
...@@ -308,7 +308,7 @@ else: ...@@ -308,7 +308,7 @@ else:
Return info about parent needed by child to unpickle process object Return info about parent needed by child to unpickle process object
''' '''
from .util import _logger, _log_to_stderr from .util import _logger, _log_to_stderr
d = dict( d = dict(
name=name, name=name,
sys_path=sys.path, sys_path=sys.path,
...@@ -317,7 +317,7 @@ else: ...@@ -317,7 +317,7 @@ else:
orig_dir=process.ORIGINAL_DIR, orig_dir=process.ORIGINAL_DIR,
authkey=process.current_process().get_authkey(), authkey=process.current_process().get_authkey(),
) )
if _logger is not None: if _logger is not None:
d['log_level'] = _logger.getEffectiveLevel() d['log_level'] = _logger.getEffectiveLevel()
...@@ -336,7 +336,7 @@ else: ...@@ -336,7 +336,7 @@ else:
# #
# Make (Pipe)Connection picklable # Make (Pipe)Connection picklable
# #
def reduce_connection(conn): def reduce_connection(conn):
if not Popen.thread_is_spawning(): if not Popen.thread_is_spawning():
raise RuntimeError( raise RuntimeError(
...@@ -345,7 +345,7 @@ else: ...@@ -345,7 +345,7 @@ else:
) )
return type(conn), (Popen.duplicate_for_child(conn.fileno()), return type(conn), (Popen.duplicate_for_child(conn.fileno()),
conn.readable, conn.writable) conn.readable, conn.writable)
copy_reg.pickle(Connection, reduce_connection) copy_reg.pickle(Connection, reduce_connection)
copy_reg.pickle(PipeConnection, reduce_connection) copy_reg.pickle(PipeConnection, reduce_connection)
...@@ -367,7 +367,7 @@ def prepare(data): ...@@ -367,7 +367,7 @@ def prepare(data):
if 'authkey' in data: if 'authkey' in data:
process.current_process()._authkey = data['authkey'] process.current_process()._authkey = data['authkey']
if 'log_to_stderr' in data and data['log_to_stderr']: if 'log_to_stderr' in data and data['log_to_stderr']:
util.log_to_stderr() util.log_to_stderr()
......
This diff is collapsed.
...@@ -40,7 +40,7 @@ try: ...@@ -40,7 +40,7 @@ try:
bytes bytes
except NameError: except NameError:
bytes = str # XXX not needed in Py2.6 and Py3.0 bytes = str # XXX not needed in Py2.6 and Py3.0
# #
# Register some things for pickling # Register some things for pickling
# #
...@@ -55,7 +55,7 @@ if view_types[0] is not list: # XXX only needed in Py3.0 ...@@ -55,7 +55,7 @@ if view_types[0] is not list: # XXX only needed in Py3.0
return list, (list(obj),) return list, (list(obj),)
for view_type in view_types: for view_type in view_types:
copy_reg.pickle(view_type, rebuild_as_list) copy_reg.pickle(view_type, rebuild_as_list)
# #
# Type for identifying shared objects # Type for identifying shared objects
# #
...@@ -104,7 +104,7 @@ def convert_to_error(kind, result): ...@@ -104,7 +104,7 @@ def convert_to_error(kind, result):
return RemoteError('Unserializable message: %s\n' % result) return RemoteError('Unserializable message: %s\n' % result)
else: else:
return ValueError('Unrecognized message type') return ValueError('Unrecognized message type')
class RemoteError(Exception): class RemoteError(Exception):
def __str__(self): def __str__(self):
return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
...@@ -340,7 +340,7 @@ class Server(object): ...@@ -340,7 +340,7 @@ class Server(object):
util.debug('resetting stdout, stderr') util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__ sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__ sys.stderr = sys.__stderr__
util._run_finalizers(0) util._run_finalizers(0)
for p in active_children(): for p in active_children():
...@@ -358,7 +358,7 @@ class Server(object): ...@@ -358,7 +358,7 @@ class Server(object):
traceback.print_exc() traceback.print_exc()
finally: finally:
exit(0) exit(0)
def create(self, c, typeid, *args, **kwds): def create(self, c, typeid, *args, **kwds):
''' '''
Create a new shared object and return its id Create a new shared object and return its id
...@@ -367,7 +367,7 @@ class Server(object): ...@@ -367,7 +367,7 @@ class Server(object):
try: try:
callable, exposed, method_to_typeid, proxytype = \ callable, exposed, method_to_typeid, proxytype = \
self.registry[typeid] self.registry[typeid]
if callable is None: if callable is None:
assert len(args) == 1 and not kwds assert len(args) == 1 and not kwds
obj = args[0] obj = args[0]
...@@ -456,7 +456,7 @@ class BaseManager(object): ...@@ -456,7 +456,7 @@ class BaseManager(object):
''' '''
_registry = {} _registry = {}
_Server = Server _Server = Server
def __init__(self, address=None, authkey=None, serializer='pickle'): def __init__(self, address=None, authkey=None, serializer='pickle'):
if authkey is None: if authkey is None:
authkey = current_process().get_authkey() authkey = current_process().get_authkey()
...@@ -487,7 +487,7 @@ class BaseManager(object): ...@@ -487,7 +487,7 @@ class BaseManager(object):
conn = Client(self._address, authkey=self._authkey) conn = Client(self._address, authkey=self._authkey)
dispatch(conn, None, 'dummy') dispatch(conn, None, 'dummy')
self._state.value = State.STARTED self._state.value = State.STARTED
def start(self): def start(self):
''' '''
Spawn a server process for this manager object Spawn a server process for this manager object
...@@ -570,10 +570,10 @@ class BaseManager(object): ...@@ -570,10 +570,10 @@ class BaseManager(object):
Return the number of shared objects Return the number of shared objects
''' '''
conn = self._Client(self._address, authkey=self._authkey) conn = self._Client(self._address, authkey=self._authkey)
try: try:
return dispatch(conn, None, 'number_of_objects') return dispatch(conn, None, 'number_of_objects')
finally: finally:
conn.close() conn.close()
def __enter__(self): def __enter__(self):
return self return self
...@@ -612,7 +612,7 @@ class BaseManager(object): ...@@ -612,7 +612,7 @@ class BaseManager(object):
del BaseProxy._address_to_local[address] del BaseProxy._address_to_local[address]
except KeyError: except KeyError:
pass pass
address = property(lambda self: self._address) address = property(lambda self: self._address)
@classmethod @classmethod
...@@ -640,7 +640,7 @@ class BaseManager(object): ...@@ -640,7 +640,7 @@ class BaseManager(object):
cls._registry[typeid] = ( cls._registry[typeid] = (
callable, exposed, method_to_typeid, proxytype callable, exposed, method_to_typeid, proxytype
) )
if create_method: if create_method:
def temp(self, *args, **kwds): def temp(self, *args, **kwds):
util.debug('requesting creation of a shared %r object', typeid) util.debug('requesting creation of a shared %r object', typeid)
...@@ -709,9 +709,9 @@ class BaseProxy(object): ...@@ -709,9 +709,9 @@ class BaseProxy(object):
if incref: if incref:
self._incref() self._incref()
util.register_after_fork(self, BaseProxy._after_fork) util.register_after_fork(self, BaseProxy._after_fork)
def _connect(self): def _connect(self):
util.debug('making connection to manager') util.debug('making connection to manager')
name = current_process().get_name() name = current_process().get_name()
...@@ -720,7 +720,7 @@ class BaseProxy(object): ...@@ -720,7 +720,7 @@ class BaseProxy(object):
conn = self._Client(self._token.address, authkey=self._authkey) conn = self._Client(self._token.address, authkey=self._authkey)
dispatch(conn, None, 'accept_connection', (name,)) dispatch(conn, None, 'accept_connection', (name,))
self._tls.connection = conn self._tls.connection = conn
def _callmethod(self, methodname, args=(), kwds={}): def _callmethod(self, methodname, args=(), kwds={}):
''' '''
Try to call a method of the referrent and return a copy of the result Try to call a method of the referrent and return a copy of the result
...@@ -735,7 +735,7 @@ class BaseProxy(object): ...@@ -735,7 +735,7 @@ class BaseProxy(object):
conn.send((self._id, methodname, args, kwds)) conn.send((self._id, methodname, args, kwds))
kind, result = conn.recv() kind, result = conn.recv()
if kind == '#RETURN': if kind == '#RETURN':
return result return result
elif kind == '#PROXY': elif kind == '#PROXY':
...@@ -793,7 +793,7 @@ class BaseProxy(object): ...@@ -793,7 +793,7 @@ class BaseProxy(object):
threading.current_thread().get_name()) threading.current_thread().get_name())
tls.connection.close() tls.connection.close()
del tls.connection del tls.connection
def _after_fork(self): def _after_fork(self):
self._manager = None self._manager = None
try: try:
...@@ -806,7 +806,7 @@ class BaseProxy(object): ...@@ -806,7 +806,7 @@ class BaseProxy(object):
kwds = {} kwds = {}
if Popen.thread_is_spawning(): if Popen.thread_is_spawning():
kwds['authkey'] = self._authkey kwds['authkey'] = self._authkey
if getattr(self, '_isauto', False): if getattr(self, '_isauto', False):
kwds['exposed'] = self._exposed_ kwds['exposed'] = self._exposed_
return (RebuildProxy, return (RebuildProxy,
...@@ -817,7 +817,7 @@ class BaseProxy(object): ...@@ -817,7 +817,7 @@ class BaseProxy(object):
def __deepcopy__(self, memo): def __deepcopy__(self, memo):
return self._getvalue() return self._getvalue()
def __repr__(self): def __repr__(self):
return '<%s object, typeid %r at %s>' % \ return '<%s object, typeid %r at %s>' % \
(type(self).__name__, self._token.typeid, '0x%x' % id(self)) (type(self).__name__, self._token.typeid, '0x%x' % id(self))
...@@ -842,7 +842,7 @@ def RebuildProxy(func, token, serializer, kwds): ...@@ -842,7 +842,7 @@ def RebuildProxy(func, token, serializer, kwds):
If possible the shared object is returned, or otherwise a proxy for it. If possible the shared object is returned, or otherwise a proxy for it.
''' '''
server = getattr(current_process(), '_manager_server', None) server = getattr(current_process(), '_manager_server', None)
if server and server.address == token.address: if server and server.address == token.address:
return server.id_to_obj[token.id][0] return server.id_to_obj[token.id][0]
else: else:
...@@ -884,7 +884,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None, ...@@ -884,7 +884,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None,
Return an auto-proxy for `token` Return an auto-proxy for `token`
''' '''
_Client = listener_client[serializer][1] _Client = listener_client[serializer][1]
if exposed is None: if exposed is None:
conn = _Client(token.address, authkey=authkey) conn = _Client(token.address, authkey=authkey)
try: try:
...@@ -995,7 +995,7 @@ class NamespaceProxy(BaseProxy): ...@@ -995,7 +995,7 @@ class NamespaceProxy(BaseProxy):
if key[0] == '_': if key[0] == '_':
return object.__getattribute__(self, key) return object.__getattribute__(self, key)
callmethod = object.__getattribute__(self, '_callmethod') callmethod = object.__getattribute__(self, '_callmethod')
return callmethod('__getattribute__', (key,)) return callmethod('__getattribute__', (key,))
def __setattr__(self, key, value): def __setattr__(self, key, value):
if key[0] == '_': if key[0] == '_':
return object.__setattr__(self, key, value) return object.__setattr__(self, key, value)
...@@ -1007,7 +1007,7 @@ class NamespaceProxy(BaseProxy): ...@@ -1007,7 +1007,7 @@ class NamespaceProxy(BaseProxy):
callmethod = object.__getattribute__(self, '_callmethod') callmethod = object.__getattribute__(self, '_callmethod')
return callmethod('__delattr__', (key,)) return callmethod('__delattr__', (key,))
class ValueProxy(BaseProxy): class ValueProxy(BaseProxy):
_exposed_ = ('get', 'set') _exposed_ = ('get', 'set')
def get(self): def get(self):
...@@ -1063,10 +1063,10 @@ PoolProxy._method_to_typeid_ = { ...@@ -1063,10 +1063,10 @@ PoolProxy._method_to_typeid_ = {
class SyncManager(BaseManager): class SyncManager(BaseManager):
''' '''
Subclass of `BaseManager` which supports a number of shared object types. Subclass of `BaseManager` which supports a number of shared object types.
The types registered are those intended for the synchronization The types registered are those intended for the synchronization
of threads, plus `dict`, `list` and `Namespace`. of threads, plus `dict`, `list` and `Namespace`.
The `multiprocessing.Manager()` function creates started instances of The `multiprocessing.Manager()` function creates started instances of
this class. this class.
''' '''
......
...@@ -58,18 +58,18 @@ def worker(inqueue, outqueue, initializer=None, initargs=()): ...@@ -58,18 +58,18 @@ def worker(inqueue, outqueue, initializer=None, initargs=()):
except (EOFError, IOError): except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting') debug('worker got EOFError or IOError -- exiting')
break break
if task is None: if task is None:
debug('worker got sentinel -- exiting') debug('worker got sentinel -- exiting')
break break
job, i, func, args, kwds = task job, i, func, args, kwds = task
try: try:
result = (True, func(*args, **kwds)) result = (True, func(*args, **kwds))
except Exception, e: except Exception, e:
result = (False, e) result = (False, e)
put((job, i, result)) put((job, i, result))
# #
# Class representing a process pool # Class representing a process pool
# #
...@@ -91,7 +91,7 @@ class Pool(object): ...@@ -91,7 +91,7 @@ class Pool(object):
processes = cpu_count() processes = cpu_count()
except NotImplementedError: except NotImplementedError:
processes = 1 processes = 1
self._pool = [] self._pool = []
for i in range(processes): for i in range(processes):
w = self.Process( w = self.Process(
...@@ -102,7 +102,7 @@ class Pool(object): ...@@ -102,7 +102,7 @@ class Pool(object):
w.set_name(w.get_name().replace('Process', 'PoolWorker')) w.set_name(w.get_name().replace('Process', 'PoolWorker'))
w.set_daemon(True) w.set_daemon(True)
w.start() w.start()
self._task_handler = threading.Thread( self._task_handler = threading.Thread(
target=Pool._handle_tasks, target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
...@@ -132,7 +132,7 @@ class Pool(object): ...@@ -132,7 +132,7 @@ class Pool(object):
self._outqueue = SimpleQueue() self._outqueue = SimpleQueue()
self._quick_put = self._inqueue._writer.send self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv self._quick_get = self._outqueue._reader.recv
def apply(self, func, args=(), kwds={}): def apply(self, func, args=(), kwds={}):
''' '''
Equivalent of `apply()` builtin Equivalent of `apply()` builtin
...@@ -182,7 +182,7 @@ class Pool(object): ...@@ -182,7 +182,7 @@ class Pool(object):
self._taskqueue.put((((result._job, i, mapstar, (x,), {}) self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length)) for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk) return (item for chunk in result for item in chunk)
def apply_async(self, func, args=(), kwds={}, callback=None): def apply_async(self, func, args=(), kwds={}, callback=None):
''' '''
Asynchronous equivalent of `apply()` builtin Asynchronous equivalent of `apply()` builtin
...@@ -199,12 +199,12 @@ class Pool(object): ...@@ -199,12 +199,12 @@ class Pool(object):
assert self._state == RUN assert self._state == RUN
if not hasattr(iterable, '__len__'): if not hasattr(iterable, '__len__'):
iterable = list(iterable) iterable = list(iterable)
if chunksize is None: if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra: if extra:
chunksize += 1 chunksize += 1
task_batches = Pool._get_tasks(func, iterable, chunksize) task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback) result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {}) self._taskqueue.put((((result._job, i, mapstar, (x,), {})
...@@ -234,13 +234,13 @@ class Pool(object): ...@@ -234,13 +234,13 @@ class Pool(object):
break break
else: else:
debug('task handler got sentinel') debug('task handler got sentinel')
try: try:
# tell result handler to finish when cache is empty # tell result handler to finish when cache is empty
debug('task handler sending sentinel to result handler') debug('task handler sending sentinel to result handler')
outqueue.put(None) outqueue.put(None)
# tell workers there is no more work # tell workers there is no more work
debug('task handler sending sentinel to workers') debug('task handler sending sentinel to workers')
for p in pool: for p in pool:
...@@ -260,12 +260,12 @@ class Pool(object): ...@@ -260,12 +260,12 @@ class Pool(object):
except (IOError, EOFError): except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting') debug('result handler got EOFError/IOError -- exiting')
return return
if thread._state: if thread._state:
assert thread._state == TERMINATE assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE') debug('result handler found thread._state=TERMINATE')
break break
if task is None: if task is None:
debug('result handler got sentinel') debug('result handler got sentinel')
break break
...@@ -321,7 +321,7 @@ class Pool(object): ...@@ -321,7 +321,7 @@ class Pool(object):
raise NotImplementedError( raise NotImplementedError(
'pool objects cannot be passed between processes or pickled' 'pool objects cannot be passed between processes or pickled'
) )
def close(self): def close(self):
debug('closing pool') debug('closing pool')
if self._state == RUN: if self._state == RUN:
...@@ -355,7 +355,7 @@ class Pool(object): ...@@ -355,7 +355,7 @@ class Pool(object):
task_handler, result_handler, cache): task_handler, result_handler, cache):
# this is guaranteed to only be called once # this is guaranteed to only be called once
debug('finalizing pool') debug('finalizing pool')
task_handler._state = TERMINATE task_handler._state = TERMINATE
taskqueue.put(None) # sentinel taskqueue.put(None) # sentinel
...@@ -363,7 +363,7 @@ class Pool(object): ...@@ -363,7 +363,7 @@ class Pool(object):
cls._help_stuff_finish(inqueue, task_handler, len(pool)) cls._help_stuff_finish(inqueue, task_handler, len(pool))
assert result_handler.is_alive() or len(cache) == 0 assert result_handler.is_alive() or len(cache) == 0
result_handler._state = TERMINATE result_handler._state = TERMINATE
outqueue.put(None) # sentinel outqueue.put(None) # sentinel
...@@ -396,14 +396,14 @@ class ApplyResult(object): ...@@ -396,14 +396,14 @@ class ApplyResult(object):
self._ready = False self._ready = False
self._callback = callback self._callback = callback
cache[self._job] = self cache[self._job] = self
def ready(self): def ready(self):
return self._ready return self._ready
def successful(self): def successful(self):
assert self._ready assert self._ready
return self._success return self._success
def wait(self, timeout=None): def wait(self, timeout=None):
self._cond.acquire() self._cond.acquire()
try: try:
...@@ -438,7 +438,7 @@ class ApplyResult(object): ...@@ -438,7 +438,7 @@ class ApplyResult(object):
# #
class MapResult(ApplyResult): class MapResult(ApplyResult):
def __init__(self, cache, chunksize, length, callback): def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback) ApplyResult.__init__(self, cache, callback)
self._success = True self._success = True
...@@ -449,7 +449,7 @@ class MapResult(ApplyResult): ...@@ -449,7 +449,7 @@ class MapResult(ApplyResult):
self._ready = True self._ready = True
else: else:
self._number_left = length//chunksize + bool(length % chunksize) self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result): def _set(self, i, success_result):
success, result = success_result success, result = success_result
if success: if success:
...@@ -492,10 +492,10 @@ class IMapIterator(object): ...@@ -492,10 +492,10 @@ class IMapIterator(object):
self._length = None self._length = None
self._unsorted = {} self._unsorted = {}
cache[self._job] = self cache[self._job] = self
def __iter__(self): def __iter__(self):
return self return self
def next(self, timeout=None): def next(self, timeout=None):
self._cond.acquire() self._cond.acquire()
try: try:
...@@ -520,7 +520,7 @@ class IMapIterator(object): ...@@ -520,7 +520,7 @@ class IMapIterator(object):
raise value raise value
__next__ = next # XXX __next__ = next # XXX
def _set(self, i, obj): def _set(self, i, obj):
self._cond.acquire() self._cond.acquire()
try: try:
...@@ -534,12 +534,12 @@ class IMapIterator(object): ...@@ -534,12 +534,12 @@ class IMapIterator(object):
self._cond.notify() self._cond.notify()
else: else:
self._unsorted[i] = obj self._unsorted[i] = obj
if self._index == self._length: if self._index == self._length:
del self._cache[self._job] del self._cache[self._job]
finally: finally:
self._cond.release() self._cond.release()
def _set_length(self, length): def _set_length(self, length):
self._cond.acquire() self._cond.acquire()
try: try:
...@@ -572,18 +572,18 @@ class IMapUnorderedIterator(IMapIterator): ...@@ -572,18 +572,18 @@ class IMapUnorderedIterator(IMapIterator):
# #
class ThreadPool(Pool): class ThreadPool(Pool):
from .dummy import Process from .dummy import Process
def __init__(self, processes=None, initializer=None, initargs=()): def __init__(self, processes=None, initializer=None, initargs=()):
Pool.__init__(self, processes, initializer, initargs) Pool.__init__(self, processes, initializer, initargs)
def _setup_queues(self): def _setup_queues(self):
self._inqueue = Queue.Queue() self._inqueue = Queue.Queue()
self._outqueue = Queue.Queue() self._outqueue = Queue.Queue()
self._quick_put = self._inqueue.put self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get self._quick_get = self._outqueue.get
@staticmethod @staticmethod
def _help_stuff_finish(inqueue, task_handler, size): def _help_stuff_finish(inqueue, task_handler, size):
# put sentinels at head of inqueue to make workers finish # put sentinels at head of inqueue to make workers finish
......
...@@ -47,7 +47,7 @@ def active_children(): ...@@ -47,7 +47,7 @@ def active_children():
''' '''
_cleanup() _cleanup()
return list(_current_process._children) return list(_current_process._children)
# #
# #
# #
...@@ -69,7 +69,7 @@ class Process(object): ...@@ -69,7 +69,7 @@ class Process(object):
The class is analagous to `threading.Thread` The class is analagous to `threading.Thread`
''' '''
_Popen = None _Popen = None
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
assert group is None, 'group argument must be None for now' assert group is None, 'group argument must be None for now'
count = _current_process._counter.next() count = _current_process._counter.next()
...@@ -91,7 +91,7 @@ class Process(object): ...@@ -91,7 +91,7 @@ class Process(object):
''' '''
if self._target: if self._target:
self._target(*self._args, **self._kwargs) self._target(*self._args, **self._kwargs)
def start(self): def start(self):
''' '''
Start child process Start child process
...@@ -114,7 +114,7 @@ class Process(object): ...@@ -114,7 +114,7 @@ class Process(object):
Terminate process; sends SIGTERM signal or uses TerminateProcess() Terminate process; sends SIGTERM signal or uses TerminateProcess()
''' '''
self._popen.terminate() self._popen.terminate()
def join(self, timeout=None): def join(self, timeout=None):
''' '''
Wait until child process terminates Wait until child process terminates
...@@ -217,11 +217,11 @@ class Process(object): ...@@ -217,11 +217,11 @@ class Process(object):
status, self._daemonic and ' daemon' or '') status, self._daemonic and ' daemon' or '')
## ##
def _bootstrap(self): def _bootstrap(self):
from . import util from . import util
global _current_process global _current_process
try: try:
self._children = set() self._children = set()
self._counter = itertools.count(1) self._counter = itertools.count(1)
......
...@@ -41,9 +41,9 @@ class Queue(object): ...@@ -41,9 +41,9 @@ class Queue(object):
else: else:
self._wlock = Lock() self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize) self._sem = BoundedSemaphore(maxsize)
self._after_fork() self._after_fork()
if sys.platform != 'win32': if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork) register_after_fork(self, Queue._after_fork)
...@@ -51,12 +51,12 @@ class Queue(object): ...@@ -51,12 +51,12 @@ class Queue(object):
assert_spawning(self) assert_spawning(self)
return (self._maxsize, self._reader, self._writer, return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state): def __setstate__(self, state):
(self._maxsize, self._reader, self._writer, (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork() self._after_fork()
def _after_fork(self): def _after_fork(self):
debug('Queue._after_fork()') debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock()) self._notempty = threading.Condition(threading.Lock())
...@@ -69,7 +69,7 @@ class Queue(object): ...@@ -69,7 +69,7 @@ class Queue(object):
self._send = self._writer.send self._send = self._writer.send
self._recv = self._reader.recv self._recv = self._reader.recv
self._poll = self._reader.poll self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None): def put(self, obj, block=True, timeout=None):
assert not self._closed assert not self._closed
if not self._sem.acquire(block, timeout): if not self._sem.acquire(block, timeout):
...@@ -93,7 +93,7 @@ class Queue(object): ...@@ -93,7 +93,7 @@ class Queue(object):
return res return res
finally: finally:
self._rlock.release() self._rlock.release()
else: else:
if block: if block:
deadline = time.time() + timeout deadline = time.time() + timeout
...@@ -135,7 +135,7 @@ class Queue(object): ...@@ -135,7 +135,7 @@ class Queue(object):
assert self._closed assert self._closed
if self._jointhread: if self._jointhread:
self._jointhread() self._jointhread()
def cancel_join_thread(self): def cancel_join_thread(self):
debug('Queue.cancel_join_thread()') debug('Queue.cancel_join_thread()')
self._joincancelled = True self._joincancelled = True
...@@ -146,7 +146,7 @@ class Queue(object): ...@@ -146,7 +146,7 @@ class Queue(object):
def _start_thread(self): def _start_thread(self):
debug('Queue._start_thread()') debug('Queue._start_thread()')
# Start thread which transfers data from buffer to pipe # Start thread which transfers data from buffer to pipe
self._buffer.clear() self._buffer.clear()
self._thread = threading.Thread( self._thread = threading.Thread(
...@@ -174,14 +174,14 @@ class Queue(object): ...@@ -174,14 +174,14 @@ class Queue(object):
[weakref.ref(self._thread)], [weakref.ref(self._thread)],
exitpriority=-5 exitpriority=-5
) )
# Send sentinel to the thread queue object when garbage collected # Send sentinel to the thread queue object when garbage collected
self._close = Finalize( self._close = Finalize(
self, Queue._finalize_close, self, Queue._finalize_close,
[self._buffer, self._notempty], [self._buffer, self._notempty],
exitpriority=10 exitpriority=10
) )
@staticmethod @staticmethod
def _finalize_join(twr): def _finalize_join(twr):
debug('joining queue thread') debug('joining queue thread')
...@@ -191,7 +191,7 @@ class Queue(object): ...@@ -191,7 +191,7 @@ class Queue(object):
debug('... queue thread joined') debug('... queue thread joined')
else: else:
debug('... queue thread already dead') debug('... queue thread already dead')
@staticmethod @staticmethod
def _finalize_close(buffer, notempty): def _finalize_close(buffer, notempty):
debug('telling queue thread to quit') debug('telling queue thread to quit')
...@@ -206,7 +206,7 @@ class Queue(object): ...@@ -206,7 +206,7 @@ class Queue(object):
def _feed(buffer, notempty, send, writelock, close): def _feed(buffer, notempty, send, writelock, close):
debug('starting thread to feed data to pipe') debug('starting thread to feed data to pipe')
from .util import is_exiting from .util import is_exiting
nacquire = notempty.acquire nacquire = notempty.acquire
nrelease = notempty.release nrelease = notempty.release
nwait = notempty.wait nwait = notempty.wait
...@@ -217,7 +217,7 @@ class Queue(object): ...@@ -217,7 +217,7 @@ class Queue(object):
wrelease = writelock.release wrelease = writelock.release
else: else:
wacquire = None wacquire = None
try: try:
while 1: while 1:
nacquire() nacquire()
...@@ -257,7 +257,7 @@ class Queue(object): ...@@ -257,7 +257,7 @@ class Queue(object):
traceback.print_exc() traceback.print_exc()
except Exception: except Exception:
pass pass
_sentinel = object() _sentinel = object()
# #
...@@ -274,7 +274,7 @@ class JoinableQueue(Queue): ...@@ -274,7 +274,7 @@ class JoinableQueue(Queue):
Queue.__init__(self, maxsize) Queue.__init__(self, maxsize)
self._unfinished_tasks = Semaphore(0) self._unfinished_tasks = Semaphore(0)
self._cond = Condition() self._cond = Condition()
def __getstate__(self): def __getstate__(self):
return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
...@@ -285,7 +285,7 @@ class JoinableQueue(Queue): ...@@ -285,7 +285,7 @@ class JoinableQueue(Queue):
def put(self, item, block=True, timeout=None): def put(self, item, block=True, timeout=None):
Queue.put(self, item, block, timeout) Queue.put(self, item, block, timeout)
self._unfinished_tasks.release() self._unfinished_tasks.release()
def task_done(self): def task_done(self):
self._cond.acquire() self._cond.acquire()
try: try:
...@@ -295,7 +295,7 @@ class JoinableQueue(Queue): ...@@ -295,7 +295,7 @@ class JoinableQueue(Queue):
self._cond.notify_all() self._cond.notify_all()
finally: finally:
self._cond.release() self._cond.release()
def join(self): def join(self):
self._cond.acquire() self._cond.acquire()
try: try:
......
...@@ -36,7 +36,7 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): ...@@ -36,7 +36,7 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
if sys.platform == 'win32': if sys.platform == 'win32':
import _subprocess import _subprocess
from ._multiprocessing import win32 from ._multiprocessing import win32
def send_handle(conn, handle, destination_pid): def send_handle(conn, handle, destination_pid):
process_handle = win32.OpenProcess( process_handle = win32.OpenProcess(
win32.PROCESS_ALL_ACCESS, False, destination_pid win32.PROCESS_ALL_ACCESS, False, destination_pid
...@@ -46,14 +46,14 @@ if sys.platform == 'win32': ...@@ -46,14 +46,14 @@ if sys.platform == 'win32':
conn.send(new_handle) conn.send(new_handle)
finally: finally:
close(process_handle) close(process_handle)
def recv_handle(conn): def recv_handle(conn):
return conn.recv() return conn.recv()
else: else:
def send_handle(conn, handle, destination_pid): def send_handle(conn, handle, destination_pid):
_multiprocessing.sendfd(conn.fileno(), handle) _multiprocessing.sendfd(conn.fileno(), handle)
def recv_handle(conn): def recv_handle(conn):
return _multiprocessing.recvfd(conn.fileno()) return _multiprocessing.recvfd(conn.fileno())
...@@ -93,7 +93,7 @@ def _get_listener(): ...@@ -93,7 +93,7 @@ def _get_listener():
def _serve(): def _serve():
from .util import is_exiting, sub_warning from .util import is_exiting, sub_warning
while 1: while 1:
try: try:
conn = _listener.accept() conn = _listener.accept()
...@@ -109,7 +109,7 @@ def _serve(): ...@@ -109,7 +109,7 @@ def _serve():
'thread for sharing handles raised exception :\n' + 'thread for sharing handles raised exception :\n' +
'-'*79 + '\n' + traceback.format_exc() + '-'*79 '-'*79 + '\n' + traceback.format_exc() + '-'*79
) )
# #
# Functions to be used for pickling/unpickling objects with handles # Functions to be used for pickling/unpickling objects with handles
# #
...@@ -176,15 +176,15 @@ copy_reg.pickle(socket.socket, reduce_socket) ...@@ -176,15 +176,15 @@ copy_reg.pickle(socket.socket, reduce_socket)
# #
if sys.platform == 'win32': if sys.platform == 'win32':
def reduce_pipe_connection(conn): def reduce_pipe_connection(conn):
rh = reduce_handle(conn.fileno()) rh = reduce_handle(conn.fileno())
return rebuild_pipe_connection, (rh, conn.readable, conn.writable) return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
def rebuild_pipe_connection(reduced_handle, readable, writable): def rebuild_pipe_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle) handle = rebuild_handle(reduced_handle)
return _multiprocessing.PipeConnection( return _multiprocessing.PipeConnection(
handle, readable=readable, writable=writable handle, readable=readable, writable=writable
) )
copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection) copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection)
...@@ -92,10 +92,10 @@ def copy(obj): ...@@ -92,10 +92,10 @@ def copy(obj):
new_obj = _new_value(type(obj)) new_obj = _new_value(type(obj))
ctypes.pointer(new_obj)[0] = obj ctypes.pointer(new_obj)[0] = obj
return new_obj return new_obj
def synchronized(obj, lock=None): def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized' assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData): if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock) return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array): elif isinstance(obj, ctypes.Array):
...@@ -123,7 +123,7 @@ def reduce_ctype(obj): ...@@ -123,7 +123,7 @@ def reduce_ctype(obj):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_) return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
else: else:
return rebuild_ctype, (type(obj), obj._wrapper, None) return rebuild_ctype, (type(obj), obj._wrapper, None)
def rebuild_ctype(type_, wrapper, length): def rebuild_ctype(type_, wrapper, length):
if length is not None: if length is not None:
type_ = type_ * length type_ = type_ * length
...@@ -170,7 +170,7 @@ class_cache = weakref.WeakKeyDictionary() ...@@ -170,7 +170,7 @@ class_cache = weakref.WeakKeyDictionary()
# #
class SynchronizedBase(object): class SynchronizedBase(object):
def __init__(self, obj, lock=None): def __init__(self, obj, lock=None):
self._obj = obj self._obj = obj
self._lock = lock or RLock() self._lock = lock or RLock()
...@@ -180,55 +180,55 @@ class SynchronizedBase(object): ...@@ -180,55 +180,55 @@ class SynchronizedBase(object):
def __reduce__(self): def __reduce__(self):
assert_spawning(self) assert_spawning(self)
return synchronized, (self._obj, self._lock) return synchronized, (self._obj, self._lock)
def get_obj(self): def get_obj(self):
return self._obj return self._obj
def get_lock(self): def get_lock(self):
return self._lock return self._lock
def __repr__(self): def __repr__(self):
return '<%s wrapper for %s>' % (type(self).__name__, self._obj) return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
class Synchronized(SynchronizedBase): class Synchronized(SynchronizedBase):
value = make_property('value') value = make_property('value')
class SynchronizedArray(SynchronizedBase): class SynchronizedArray(SynchronizedBase):
def __len__(self): def __len__(self):
return len(self._obj) return len(self._obj)
def __getitem__(self, i): def __getitem__(self, i):
self.acquire() self.acquire()
try: try:
return self._obj[i] return self._obj[i]
finally: finally:
self.release() self.release()
def __setitem__(self, i, value): def __setitem__(self, i, value):
self.acquire() self.acquire()
try: try:
self._obj[i] = value self._obj[i] = value
finally: finally:
self.release() self.release()
def __getslice__(self, start, stop): def __getslice__(self, start, stop):
self.acquire() self.acquire()
try: try:
return self._obj[start:stop] return self._obj[start:stop]
finally: finally:
self.release() self.release()
def __setslice__(self, start, stop, values): def __setslice__(self, start, stop, values):
self.acquire() self.acquire()
try: try:
self._obj[start:stop] = values self._obj[start:stop] = values
finally: finally:
self.release() self.release()
class SynchronizedString(SynchronizedArray): class SynchronizedString(SynchronizedArray):
value = make_property('value') value = make_property('value')
raw = make_property('raw') raw = make_property('raw')
...@@ -38,7 +38,7 @@ class SemLock(object): ...@@ -38,7 +38,7 @@ class SemLock(object):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
debug('created semlock with handle %s' % sl.handle) debug('created semlock with handle %s' % sl.handle)
self._make_methods() self._make_methods()
if sys.platform != 'win32': if sys.platform != 'win32':
def _after_fork(obj): def _after_fork(obj):
obj._semlock._after_fork() obj._semlock._after_fork()
...@@ -129,7 +129,7 @@ class RLock(SemLock): ...@@ -129,7 +129,7 @@ class RLock(SemLock):
def __init__(self): def __init__(self):
SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
def __repr__(self): def __repr__(self):
try: try:
if self._semlock._is_mine(): if self._semlock._is_mine():
...@@ -210,17 +210,17 @@ class Condition(object): ...@@ -210,17 +210,17 @@ class Condition(object):
def notify(self): def notify(self):
assert self._lock._semlock._is_mine(), 'lock is not owned' assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False) assert not self._wait_semaphore.acquire(False)
# to take account of timeouts since last notify() we subtract # to take account of timeouts since last notify() we subtract
# woken_count from sleeping_count and rezero woken_count # woken_count from sleeping_count and rezero woken_count
while self._woken_count.acquire(False): while self._woken_count.acquire(False):
res = self._sleeping_count.acquire(False) res = self._sleeping_count.acquire(False)
assert res assert res
if self._sleeping_count.acquire(False): # try grabbing a sleeper if self._sleeping_count.acquire(False): # try grabbing a sleeper
self._wait_semaphore.release() # wake up one sleeper self._wait_semaphore.release() # wake up one sleeper
self._woken_count.acquire() # wait for the sleeper to wake self._woken_count.acquire() # wait for the sleeper to wake
# rezero _wait_semaphore in case a timeout just happened # rezero _wait_semaphore in case a timeout just happened
self._wait_semaphore.acquire(False) self._wait_semaphore.acquire(False)
...@@ -233,7 +233,7 @@ class Condition(object): ...@@ -233,7 +233,7 @@ class Condition(object):
while self._woken_count.acquire(False): while self._woken_count.acquire(False):
res = self._sleeping_count.acquire(False) res = self._sleeping_count.acquire(False)
assert res assert res
sleepers = 0 sleepers = 0
while self._sleeping_count.acquire(False): while self._sleeping_count.acquire(False):
self._wait_semaphore.release() # wake up one sleeper self._wait_semaphore.release() # wake up one sleeper
...@@ -266,7 +266,7 @@ class Event(object): ...@@ -266,7 +266,7 @@ class Event(object):
return False return False
finally: finally:
self._cond.release() self._cond.release()
def set(self): def set(self):
self._cond.acquire() self._cond.acquire()
try: try:
......
...@@ -83,7 +83,7 @@ def _check_logger_class(): ...@@ -83,7 +83,7 @@ def _check_logger_class():
import logging import logging
if hasattr(logging, 'multiprocessing'): if hasattr(logging, 'multiprocessing'):
return return
logging._acquireLock() logging._acquireLock()
try: try:
OldLoggerClass = logging.getLoggerClass() OldLoggerClass = logging.getLoggerClass()
......
#ifndef MULTIPROCESSING_H #ifndef MULTIPROCESSING_H
#define MULTIPROCESSING_H #define MULTIPROCESSING_H
#define PY_SSIZE_T_CLEAN #define PY_SSIZE_T_CLEAN
#include "Python.h" #include "Python.h"
#include "structmember.h" #include "structmember.h"
#include "pythread.h" #include "pythread.h"
/* /*
* Platform includes and definitions * Platform includes and definitions
*/ */
#ifdef MS_WINDOWS #ifdef MS_WINDOWS
# define WIN32_LEAN_AND_MEAN # define WIN32_LEAN_AND_MEAN
# include <windows.h> # include <windows.h>
# include <winsock2.h> # include <winsock2.h>
# include <process.h> /* getpid() */ # include <process.h> /* getpid() */
# define SEM_HANDLE HANDLE # define SEM_HANDLE HANDLE
# define SEM_VALUE_MAX LONG_MAX # define SEM_VALUE_MAX LONG_MAX
#else #else
# include <fcntl.h> /* O_CREAT and O_EXCL */ # include <fcntl.h> /* O_CREAT and O_EXCL */
# include <sys/socket.h> # include <sys/socket.h>
# include <arpa/inet.h> /* htonl() and ntohl() */ # include <arpa/inet.h> /* htonl() and ntohl() */
# if HAVE_SEM_OPEN # if HAVE_SEM_OPEN
# include <semaphore.h> # include <semaphore.h>
typedef sem_t *SEM_HANDLE; typedef sem_t *SEM_HANDLE;
# endif # endif
# define HANDLE int # define HANDLE int
# define SOCKET int # define SOCKET int
# define BOOL int # define BOOL int
# define UINT32 uint32_t # define UINT32 uint32_t
# define INT32 int32_t # define INT32 int32_t
# define TRUE 1 # define TRUE 1
# define FALSE 0 # define FALSE 0
# define INVALID_HANDLE_VALUE (-1) # define INVALID_HANDLE_VALUE (-1)
#endif #endif
/* /*
* Make sure Py_ssize_t available * Make sure Py_ssize_t available
*/ */
#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN) #if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
typedef int Py_ssize_t; typedef int Py_ssize_t;
# define PY_SSIZE_T_MAX INT_MAX # define PY_SSIZE_T_MAX INT_MAX
# define PY_SSIZE_T_MIN INT_MIN # define PY_SSIZE_T_MIN INT_MIN
# define F_PY_SSIZE_T "i" # define F_PY_SSIZE_T "i"
# define PY_FORMAT_SIZE_T "" # define PY_FORMAT_SIZE_T ""
# define PyInt_FromSsize_t(n) PyInt_FromLong((long)n) # define PyInt_FromSsize_t(n) PyInt_FromLong((long)n)
#else #else
# define F_PY_SSIZE_T "n" # define F_PY_SSIZE_T "n"
#endif #endif
/* /*
* Format codes * Format codes
*/ */
#if SIZEOF_VOID_P == SIZEOF_LONG #if SIZEOF_VOID_P == SIZEOF_LONG
# define F_POINTER "k" # define F_POINTER "k"
# define T_POINTER T_ULONG # define T_POINTER T_ULONG
#elif defined(HAVE_LONG_LONG) && (SIZEOF_VOID_P == SIZEOF_LONG_LONG) #elif defined(HAVE_LONG_LONG) && (SIZEOF_VOID_P == SIZEOF_LONG_LONG)
# define F_POINTER "K" # define F_POINTER "K"
# define T_POINTER T_ULONGLONG # define T_POINTER T_ULONGLONG
#else #else
# error "can't find format code for unsigned integer of same size as void*" # error "can't find format code for unsigned integer of same size as void*"
#endif #endif
#ifdef MS_WINDOWS #ifdef MS_WINDOWS
# define F_HANDLE F_POINTER # define F_HANDLE F_POINTER
# define T_HANDLE T_POINTER # define T_HANDLE T_POINTER
# define F_SEM_HANDLE F_HANDLE # define F_SEM_HANDLE F_HANDLE
# define T_SEM_HANDLE T_HANDLE # define T_SEM_HANDLE T_HANDLE
# define F_DWORD "k" # define F_DWORD "k"
# define T_DWORD T_ULONG # define T_DWORD T_ULONG
#else #else
# define F_HANDLE "i" # define F_HANDLE "i"
# define T_HANDLE T_INT # define T_HANDLE T_INT
# define F_SEM_HANDLE F_POINTER # define F_SEM_HANDLE F_POINTER
# define T_SEM_HANDLE T_POINTER # define T_SEM_HANDLE T_POINTER
#endif #endif
#if PY_VERSION_HEX >= 0x03000000 #if PY_VERSION_HEX >= 0x03000000
# define F_RBUFFER "y" # define F_RBUFFER "y"
#else #else
# define F_RBUFFER "s" # define F_RBUFFER "s"
#endif #endif
/* /*
* Error codes which can be returned by functions called without GIL * Error codes which can be returned by functions called without GIL
*/ */
#define MP_SUCCESS (0) #define MP_SUCCESS (0)
#define MP_STANDARD_ERROR (-1) #define MP_STANDARD_ERROR (-1)
#define MP_MEMORY_ERROR (-1001) #define MP_MEMORY_ERROR (-1001)
#define MP_END_OF_FILE (-1002) #define MP_END_OF_FILE (-1002)
#define MP_EARLY_END_OF_FILE (-1003) #define MP_EARLY_END_OF_FILE (-1003)
#define MP_BAD_MESSAGE_LENGTH (-1004) #define MP_BAD_MESSAGE_LENGTH (-1004)
#define MP_SOCKET_ERROR (-1005) #define MP_SOCKET_ERROR (-1005)
#define MP_EXCEPTION_HAS_BEEN_SET (-1006) #define MP_EXCEPTION_HAS_BEEN_SET (-1006)
PyObject *mp_SetError(PyObject *Type, int num); PyObject *mp_SetError(PyObject *Type, int num);
/* /*
* Externs - not all will really exist on all platforms * Externs - not all will really exist on all platforms
*/ */
extern PyObject *pickle_dumps; extern PyObject *pickle_dumps;
extern PyObject *pickle_loads; extern PyObject *pickle_loads;
extern PyObject *pickle_protocol; extern PyObject *pickle_protocol;
extern PyObject *BufferTooShort; extern PyObject *BufferTooShort;
extern PyTypeObject SemLockType; extern PyTypeObject SemLockType;
extern PyTypeObject ConnectionType; extern PyTypeObject ConnectionType;
extern PyTypeObject PipeConnectionType; extern PyTypeObject PipeConnectionType;
extern HANDLE sigint_event; extern HANDLE sigint_event;
/* /*
* Py3k compatibility * Py3k compatibility
*/ */
#if PY_VERSION_HEX >= 0x03000000 #if PY_VERSION_HEX >= 0x03000000
# define PICKLE_MODULE "pickle" # define PICKLE_MODULE "pickle"
# define FROM_FORMAT PyUnicode_FromFormat # define FROM_FORMAT PyUnicode_FromFormat
# define PyInt_FromLong PyLong_FromLong # define PyInt_FromLong PyLong_FromLong
# define PyInt_FromSsize_t PyLong_FromSsize_t # define PyInt_FromSsize_t PyLong_FromSsize_t
#else #else
# define PICKLE_MODULE "cPickle" # define PICKLE_MODULE "cPickle"
# define FROM_FORMAT PyString_FromFormat # define FROM_FORMAT PyString_FromFormat
#endif #endif
#ifndef PyVarObject_HEAD_INIT #ifndef PyVarObject_HEAD_INIT
# define PyVarObject_HEAD_INIT(type, size) PyObject_HEAD_INIT(type) size, # define PyVarObject_HEAD_INIT(type, size) PyObject_HEAD_INIT(type) size,
#endif #endif
#ifndef Py_TPFLAGS_HAVE_WEAKREFS #ifndef Py_TPFLAGS_HAVE_WEAKREFS
# define Py_TPFLAGS_HAVE_WEAKREFS 0 # define Py_TPFLAGS_HAVE_WEAKREFS 0
#endif #endif
/* /*
* Connection definition * Connection definition
*/ */
#define CONNECTION_BUFFER_SIZE 1024 #define CONNECTION_BUFFER_SIZE 1024
typedef struct { typedef struct {
PyObject_HEAD PyObject_HEAD
HANDLE handle; HANDLE handle;
int flags; int flags;
PyObject *weakreflist; PyObject *weakreflist;
char buffer[CONNECTION_BUFFER_SIZE]; char buffer[CONNECTION_BUFFER_SIZE];
} ConnectionObject; } ConnectionObject;
/* /*
* Miscellaneous * Miscellaneous
*/ */
#define MAX_MESSAGE_LENGTH 0x7fffffff #define MAX_MESSAGE_LENGTH 0x7fffffff
#ifndef MIN #ifndef MIN
# define MIN(x, y) ((x) < (y) ? x : y) # define MIN(x, y) ((x) < (y) ? x : y)
# define MAX(x, y) ((x) > (y) ? x : y) # define MAX(x, y) ((x) > (y) ? x : y)
#endif #endif
#endif /* MULTIPROCESSING_H */ #endif /* MULTIPROCESSING_H */
/* /*
* A type which wraps a pipe handle in message oriented mode * A type which wraps a pipe handle in message oriented mode
* *
* pipe_connection.c * pipe_connection.c
* *
* Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/ */
#include "multiprocessing.h" #include "multiprocessing.h"
#define CLOSE(h) CloseHandle(h) #define CLOSE(h) CloseHandle(h)
/* /*
* Send string to the pipe; assumes in message oriented mode * Send string to the pipe; assumes in message oriented mode
*/ */
static Py_ssize_t static Py_ssize_t
conn_send_string(ConnectionObject *conn, char *string, size_t length) conn_send_string(ConnectionObject *conn, char *string, size_t length)
{ {
DWORD amount_written; DWORD amount_written;
return WriteFile(conn->handle, string, length, &amount_written, NULL) return WriteFile(conn->handle, string, length, &amount_written, NULL)
? MP_SUCCESS : MP_STANDARD_ERROR; ? MP_SUCCESS : MP_STANDARD_ERROR;
} }
/* /*
* Attempts to read into buffer, or if buffer too small into *newbuffer. * Attempts to read into buffer, or if buffer too small into *newbuffer.
* *
* Returns number of bytes read. Assumes in message oriented mode. * Returns number of bytes read. Assumes in message oriented mode.
*/ */
static Py_ssize_t static Py_ssize_t
conn_recv_string(ConnectionObject *conn, char *buffer, conn_recv_string(ConnectionObject *conn, char *buffer,
size_t buflength, char **newbuffer, size_t maxlength) size_t buflength, char **newbuffer, size_t maxlength)
{ {
DWORD left, length, full_length, err; DWORD left, length, full_length, err;
*newbuffer = NULL; *newbuffer = NULL;
if (ReadFile(conn->handle, buffer, MIN(buflength, maxlength), if (ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
&length, NULL)) &length, NULL))
return length; return length;
err = GetLastError(); err = GetLastError();
if (err != ERROR_MORE_DATA) { if (err != ERROR_MORE_DATA) {
if (err == ERROR_BROKEN_PIPE) if (err == ERROR_BROKEN_PIPE)
return MP_END_OF_FILE; return MP_END_OF_FILE;
return MP_STANDARD_ERROR; return MP_STANDARD_ERROR;
} }
if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left)) if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
return MP_STANDARD_ERROR; return MP_STANDARD_ERROR;
full_length = length + left; full_length = length + left;
if (full_length > maxlength) if (full_length > maxlength)
return MP_BAD_MESSAGE_LENGTH; return MP_BAD_MESSAGE_LENGTH;
*newbuffer = PyMem_Malloc(full_length); *newbuffer = PyMem_Malloc(full_length);
if (*newbuffer == NULL) if (*newbuffer == NULL)
return MP_MEMORY_ERROR; return MP_MEMORY_ERROR;
memcpy(*newbuffer, buffer, length); memcpy(*newbuffer, buffer, length);
if (ReadFile(conn->handle, *newbuffer+length, left, &length, NULL)) { if (ReadFile(conn->handle, *newbuffer+length, left, &length, NULL)) {
assert(length == left); assert(length == left);
return full_length; return full_length;
} else { } else {
PyMem_Free(*newbuffer); PyMem_Free(*newbuffer);
return MP_STANDARD_ERROR; return MP_STANDARD_ERROR;
} }
} }
/* /*
* Check whether any data is available for reading * Check whether any data is available for reading
*/ */
#define conn_poll(conn, timeout) conn_poll_save(conn, timeout, _save) #define conn_poll(conn, timeout) conn_poll_save(conn, timeout, _save)
static int static int
conn_poll_save(ConnectionObject *conn, double timeout, PyThreadState *_save) conn_poll_save(ConnectionObject *conn, double timeout, PyThreadState *_save)
{ {
DWORD bytes, deadline, delay; DWORD bytes, deadline, delay;
int difference, res; int difference, res;
BOOL block = FALSE; BOOL block = FALSE;
if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL)) if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
return MP_STANDARD_ERROR; return MP_STANDARD_ERROR;
if (timeout == 0.0) if (timeout == 0.0)
return bytes > 0; return bytes > 0;
if (timeout < 0.0) if (timeout < 0.0)
block = TRUE; block = TRUE;
else else
/* XXX does not check for overflow */ /* XXX does not check for overflow */
deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5); deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
Sleep(0); Sleep(0);
for (delay = 1 ; ; delay += 1) { for (delay = 1 ; ; delay += 1) {
if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL)) if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
return MP_STANDARD_ERROR; return MP_STANDARD_ERROR;
else if (bytes > 0) else if (bytes > 0)
return TRUE; return TRUE;
if (!block) { if (!block) {
difference = deadline - GetTickCount(); difference = deadline - GetTickCount();
if (difference < 0) if (difference < 0)
return FALSE; return FALSE;
if ((int)delay > difference) if ((int)delay > difference)
delay = difference; delay = difference;
} }
if (delay > 20) if (delay > 20)
delay = 20; delay = 20;
Sleep(delay); Sleep(delay);
/* check for signals */ /* check for signals */
Py_BLOCK_THREADS Py_BLOCK_THREADS
res = PyErr_CheckSignals(); res = PyErr_CheckSignals();
Py_UNBLOCK_THREADS Py_UNBLOCK_THREADS
if (res) if (res)
return MP_EXCEPTION_HAS_BEEN_SET; return MP_EXCEPTION_HAS_BEEN_SET;
} }
} }
/* /*
* "connection.h" defines the PipeConnection type using the definitions above * "connection.h" defines the PipeConnection type using the definitions above
*/ */
#define CONNECTION_NAME "PipeConnection" #define CONNECTION_NAME "PipeConnection"
#define CONNECTION_TYPE PipeConnectionType #define CONNECTION_TYPE PipeConnectionType
#include "connection.h" #include "connection.h"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment