process.py 10.3 KB
Newer Older
1 2 3 4 5
#
# Module providing the `Process` class which emulates `threading.Thread`
#
# multiprocessing/process.py
#
6
# Copyright (c) 2006-2008, R Oudkerk
7
# Licensed to PSF under a Contributor Agreement.
8 9
#

10
__all__ = ['BaseProcess', 'current_process', 'active_children']
11 12 13 14 15 16 17 18 19

#
# Imports
#

import os
import sys
import signal
import itertools
20
import threading
21
from _weakrefset import WeakSet
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

#
#
#

try:
    ORIGINAL_DIR = os.path.abspath(os.getcwd())
except OSError:
    ORIGINAL_DIR = None

#
# Public functions
#

def current_process():
    '''
    Return process object representing the current process
    '''
    return _current_process

def active_children():
    '''
    Return list of process objects corresponding to live child processes
    '''
    _cleanup()
47
    return list(_children)
48 49 50 51 52 53 54

#
#
#

def _cleanup():
    # check for processes which have finished
55
    for p in list(_children):
56
        if p._popen.poll() is not None:
57
            _children.discard(p)
58 59 60 61 62

#
# The `Process` class
#

63
class BaseProcess(object):
64 65 66
    '''
    Process objects represent activity that is run in a separate process

67
    The class is analogous to `threading.Thread`
68
    '''
69 70
    def _Popen(self):
        raise NotImplementedError
71

72 73
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
                 *, daemon=None):
74
        assert group is None, 'group argument must be None for now'
75
        count = next(_process_counter)
76
        self._identity = _current_process._identity + (count,)
77
        self._config = _current_process._config.copy()
78 79
        self._parent_pid = os.getpid()
        self._popen = None
80
        self._closed = False
81 82 83 84 85
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = name or type(self).__name__ + '-' + \
                     ':'.join(str(i) for i in self._identity)
86 87
        if daemon is not None:
            self.daemon = daemon
88
        _dangling.add(self)
89

90 91 92 93
    def _check_closed(self):
        if self._closed:
            raise ValueError("process object is closed")

94 95 96 97 98 99 100 101 102 103 104
    def run(self):
        '''
        Method to be run in sub-process; can be overridden in sub-class
        '''
        if self._target:
            self._target(*self._args, **self._kwargs)

    def start(self):
        '''
        Start child process
        '''
105
        self._check_closed()
106 107 108
        assert self._popen is None, 'cannot start a process twice'
        assert self._parent_pid == os.getpid(), \
               'can only start a process object created by current process'
109
        assert not _current_process._config.get('daemon'), \
110 111
               'daemonic processes are not allowed to have children'
        _cleanup()
112
        self._popen = self._Popen(self)
113
        self._sentinel = self._popen.sentinel
114 115 116
        # Avoid a refcycle if the target function holds an indirect
        # reference to the process object (see bpo-30775)
        del self._target, self._args, self._kwargs
117
        _children.add(self)
118 119 120 121 122

    def terminate(self):
        '''
        Terminate process; sends SIGTERM signal or uses TerminateProcess()
        '''
123
        self._check_closed()
124 125
        self._popen.terminate()

126 127 128 129 130 131 132
    def kill(self):
        '''
        Terminate process; sends SIGKILL signal or uses TerminateProcess()
        '''
        self._check_closed()
        self._popen.kill()

133 134 135 136
    def join(self, timeout=None):
        '''
        Wait until child process terminates
        '''
137
        self._check_closed()
138 139 140 141
        assert self._parent_pid == os.getpid(), 'can only join a child process'
        assert self._popen is not None, 'can only join a started process'
        res = self._popen.wait(timeout)
        if res is not None:
142
            _children.discard(self)
143 144 145 146 147

    def is_alive(self):
        '''
        Return whether process is alive
        '''
148
        self._check_closed()
149 150 151
        if self is _current_process:
            return True
        assert self._parent_pid == os.getpid(), 'can only test a child process'
152

153 154
        if self._popen is None:
            return False
155 156 157 158 159 160 161

        returncode = self._popen.poll()
        if returncode is None:
            return True
        else:
            _children.discard(self)
            return False
162

163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
    def close(self):
        '''
        Close the Process object.

        This method releases resources held by the Process object.  It is
        an error to call this method if the child process is still running.
        '''
        if self._popen is not None:
            if self._popen.poll() is None:
                raise ValueError("Cannot close a process while it is still running. "
                                 "You should first call join() or terminate().")
            self._popen.close()
            self._popen = None
            del self._sentinel
            _children.discard(self)
        self._closed = True

180 181
    @property
    def name(self):
182 183
        return self._name

184 185
    @name.setter
    def name(self, name):
186 187 188
        assert isinstance(name, str), 'name must be a string'
        self._name = name

189 190
    @property
    def daemon(self):
191 192 193
        '''
        Return whether process is a daemon
        '''
194
        return self._config.get('daemon', False)
195

196 197
    @daemon.setter
    def daemon(self, daemonic):
198 199 200 201
        '''
        Set whether process is a daemon
        '''
        assert self._popen is None, 'process has already started'
202
        self._config['daemon'] = daemonic
203

204 205
    @property
    def authkey(self):
206
        return self._config['authkey']
207

208 209
    @authkey.setter
    def authkey(self, authkey):
210 211 212
        '''
        Set authorization key of process
        '''
213
        self._config['authkey'] = AuthenticationString(authkey)
214

215 216
    @property
    def exitcode(self):
217 218 219
        '''
        Return exit code of process or `None` if it has yet to stop
        '''
220
        self._check_closed()
221 222 223 224
        if self._popen is None:
            return self._popen
        return self._popen.poll()

225 226
    @property
    def ident(self):
227
        '''
228
        Return identifier (PID) of process or `None` if it has yet to start
229
        '''
230
        self._check_closed()
231 232 233 234 235
        if self is _current_process:
            return os.getpid()
        else:
            return self._popen and self._popen.pid

236
    pid = ident
237

238 239 240 241 242 243
    @property
    def sentinel(self):
        '''
        Return a file descriptor (Unix) or handle (Windows) suitable for
        waiting for process termination.
        '''
244
        self._check_closed()
245 246 247
        try:
            return self._sentinel
        except AttributeError:
248
            raise ValueError("process not started") from None
249

250 251 252
    def __repr__(self):
        if self is _current_process:
            status = 'started'
253 254
        elif self._closed:
            status = 'closed'
255 256 257 258 259 260
        elif self._parent_pid != os.getpid():
            status = 'unknown'
        elif self._popen is None:
            status = 'initial'
        else:
            if self._popen.poll() is not None:
261
                status = self.exitcode
262 263 264 265 266 267 268 269 270 271
            else:
                status = 'started'

        if type(status) is int:
            if status == 0:
                status = 'stopped'
            else:
                status = 'stopped[%s]' % _exitcode_to_name.get(status, status)

        return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
272
                                   status, self.daemon and ' daemon' or '')
273 274 275 276

    ##

    def _bootstrap(self):
277
        from . import util, context
278
        global _current_process, _process_counter, _children
279 280

        try:
281 282
            if self._start_method is not None:
                context._force_start_method(self._start_method)
283 284
            _process_counter = itertools.count(1)
            _children = set()
Victor Stinner's avatar
Victor Stinner committed
285
            util._close_stdin()
286
            old_process = _current_process
287
            _current_process = self
288 289 290 291 292 293 294
            try:
                util._finalizer_registry.clear()
                util._run_after_forkers()
            finally:
                # delay finalization of the old process object until after
                # _run_after_forkers() is executed
                del old_process
295 296 297 298 299 300 301 302 303
            util.info('child process calling self.run()')
            try:
                self.run()
                exitcode = 0
            finally:
                util._exit_function()
        except SystemExit as e:
            if not e.args:
                exitcode = 1
304
            elif isinstance(e.args[0], int):
305 306
                exitcode = e.args[0]
            else:
307
                sys.stderr.write(str(e.args[0]) + '\n')
308
                exitcode = 1
309 310 311
        except:
            exitcode = 1
            import traceback
312
            sys.stderr.write('Process %s:\n' % self.name)
313
            traceback.print_exc()
314
        finally:
315
            threading._shutdown()
316
            util.info('process exiting with exitcode %d' % exitcode)
317
            util._flush_std_streams()
318 319 320 321 322 323 324 325 326

        return exitcode

#
# We subclass bytes to avoid accidental transmission of auth keys over network
#

class AuthenticationString(bytes):
    def __reduce__(self):
327
        from .context import get_spawning_popen
328
        if get_spawning_popen() is None:
329 330 331 332 333 334 335 336 337 338
            raise TypeError(
                'Pickling an AuthenticationString object is '
                'disallowed for security reasons'
                )
        return AuthenticationString, (bytes(self),)

#
# Create object representing the main process
#

339
class _MainProcess(BaseProcess):
340 341 342 343 344 345

    def __init__(self):
        self._identity = ()
        self._name = 'MainProcess'
        self._parent_pid = None
        self._popen = None
346
        self._closed = False
347
        self._config = {'authkey': AuthenticationString(os.urandom(32)),
348
                        'semprefix': '/mp'}
349
        # Note that some versions of FreeBSD only allow named
350
        # semaphores to have names of up to 14 characters.  Therefore
351
        # we choose a short prefix.
352 353 354 355 356 357
        #
        # On MacOSX in a sandbox it may be necessary to use a
        # different prefix -- see #19478.
        #
        # Everything in self._config will be inherited by descendant
        # processes.
358

359 360 361
    def close(self):
        pass

362 363

_current_process = _MainProcess()
364 365
_process_counter = itertools.count(1)
_children = set()
366 367 368 369 370 371 372 373 374 375 376
del _MainProcess

#
# Give names to some return codes
#

_exitcode_to_name = {}

for name, signum in list(signal.__dict__.items()):
    if name[:3]=='SIG' and '_' not in name:
        _exitcode_to_name[-signum] = name
377 378 379

# For debug and leak testing
_dangling = WeakSet()