util.py 11 KB
Newer Older
1 2 3 4 5
#
# Module providing various facilities to other parts of the package
#
# multiprocessing/util.py
#
6
# Copyright (c) 2006-2008, R Oudkerk
7
# Licensed to PSF under a Contributor Agreement.
8 9
#

10
import os
11
import itertools
Victor Stinner's avatar
Victor Stinner committed
12
import sys
13 14 15 16
import weakref
import atexit
import threading        # we want threading to install it's
                        # cleanup function before multiprocessing does
17
from subprocess import _args_from_interpreter_flags
18

19
from . import process
20 21 22 23

__all__ = [
    'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
    'log_to_stderr', 'get_temp_dir', 'register_after_fork',
Jesse Noller's avatar
Jesse Noller committed
24
    'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
25
    'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    ]

#
# Logging
#

NOTSET = 0
SUBDEBUG = 5
DEBUG = 10
INFO = 20
SUBWARNING = 25

LOGGER_NAME = 'multiprocessing'
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'

_logger = None
_log_to_stderr = False

def sub_debug(msg, *args):
    if _logger:
        _logger.log(SUBDEBUG, msg, *args)

def debug(msg, *args):
    if _logger:
        _logger.log(DEBUG, msg, *args)

def info(msg, *args):
    if _logger:
        _logger.log(INFO, msg, *args)

def sub_warning(msg, *args):
    if _logger:
        _logger.log(SUBWARNING, msg, *args)

def get_logger():
    '''
    Returns logger used by multiprocessing
    '''
    global _logger
65
    import logging
66

Jesse Noller's avatar
Jesse Noller committed
67 68 69
    logging._acquireLock()
    try:
        if not _logger:
70

Jesse Noller's avatar
Jesse Noller committed
71 72
            _logger = logging.getLogger(LOGGER_NAME)
            _logger.propagate = 0
73

Jesse Noller's avatar
Jesse Noller committed
74 75 76 77 78 79 80 81 82 83
            # XXX multiprocessing should cleanup before logging
            if hasattr(atexit, 'unregister'):
                atexit.unregister(_exit_function)
                atexit.register(_exit_function)
            else:
                atexit._exithandlers.remove((_exit_function, (), {}))
                atexit._exithandlers.append((_exit_function, (), {}))

    finally:
        logging._releaseLock()
84 85 86 87 88 89 90 91 92

    return _logger

def log_to_stderr(level=None):
    '''
    Turn on logging and add a handler which prints to stderr
    '''
    global _log_to_stderr
    import logging
Jesse Noller's avatar
Jesse Noller committed
93

94 95 96 97 98
    logger = get_logger()
    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
Jesse Noller's avatar
Jesse Noller committed
99 100

    if level:
101 102
        logger.setLevel(level)
    _log_to_stderr = True
Jesse Noller's avatar
Jesse Noller committed
103
    return _logger
104 105 106 107 108 109 110

#
# Function returning a temp directory which will be removed on exit
#

def get_temp_dir():
    # get name of a temp directory which will be automatically cleaned up
111 112
    tempdir = process.current_process()._config.get('tempdir')
    if tempdir is None:
113 114 115 116
        import shutil, tempfile
        tempdir = tempfile.mkdtemp(prefix='pymp-')
        info('created temp directory %s', tempdir)
        Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
117 118
        process.current_process()._config['tempdir'] = tempdir
    return tempdir
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162

#
# Support for reinitialization of objects when bootstrapping a child process
#

_afterfork_registry = weakref.WeakValueDictionary()
_afterfork_counter = itertools.count()

def _run_after_forkers():
    items = list(_afterfork_registry.items())
    items.sort()
    for (index, ident, func), obj in items:
        try:
            func(obj)
        except Exception as e:
            info('after forker raised exception %s', e)

def register_after_fork(obj, func):
    _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj

#
# Finalization using weakrefs
#

_finalizer_registry = {}
_finalizer_counter = itertools.count()


class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        assert exitpriority is None or type(exitpriority) is int

        if obj is not None:
            self._weakref = weakref.ref(obj, self)
        else:
            assert exitpriority is not None

        self._callback = callback
        self._args = args
        self._kwargs = kwargs or {}
        self._key = (exitpriority, next(_finalizer_counter))
163
        self._pid = os.getpid()
164 165 166

        _finalizer_registry[self._key] = self

167 168 169 170
    def __call__(self, wr=None,
                 # Need to bind these locally because the globals can have
                 # been cleared at shutdown
                 _finalizer_registry=_finalizer_registry,
171
                 sub_debug=sub_debug, getpid=os.getpid):
172 173 174 175 176 177 178 179
        '''
        Run the callback unless it has already been called or cancelled
        '''
        try:
            del _finalizer_registry[self._key]
        except KeyError:
            sub_debug('finalizer no longer registered')
        else:
180
            if self._pid != getpid():
181 182 183 184 185 186
                sub_debug('finalizer ignored because different process')
                res = None
            else:
                sub_debug('finalizer calling %s with args %s and kwargs %s',
                          self._callback, self._args, self._kwargs)
                res = self._callback(*self._args, **self._kwargs)
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
            self._weakref = self._callback = self._args = \
                            self._kwargs = self._key = None
            return res

    def cancel(self):
        '''
        Cancel finalization of the object
        '''
        try:
            del _finalizer_registry[self._key]
        except KeyError:
            pass
        else:
            self._weakref = self._callback = self._args = \
                            self._kwargs = self._key = None

    def still_active(self):
        '''
        Return whether this finalizer is still waiting to invoke callback
        '''
        return self._key in _finalizer_registry

    def __repr__(self):
        try:
            obj = self._weakref()
        except (AttributeError, TypeError):
            obj = None

        if obj is None:
216
            return '<%s object, dead>' % self.__class__.__name__
217

218 219 220
        x = '<%s object, callback=%s' % (
                self.__class__.__name__,
                getattr(self._callback, '__name__', self._callback))
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
        if self._args:
            x += ', args=' + str(self._args)
        if self._kwargs:
            x += ', kwargs=' + str(self._kwargs)
        if self._key[0] is not None:
            x += ', exitprority=' + str(self._key[0])
        return x + '>'


def _run_finalizers(minpriority=None):
    '''
    Run all finalizers whose exit priority is not None and at least minpriority

    Finalizers with highest priority are called first; finalizers with
    the same priority will be called in reverse order of creation.
    '''
237 238 239 240 241
    if _finalizer_registry is None:
        # This function may be called after this module's globals are
        # destroyed.  See the _exit_function function in this module for more
        # notes.
        return
Alexander Belopolsky's avatar
Alexander Belopolsky committed
242

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
    if minpriority is None:
        f = lambda p : p[0][0] is not None
    else:
        f = lambda p : p[0][0] is not None and p[0][0] >= minpriority

    items = [x for x in list(_finalizer_registry.items()) if f(x)]
    items.sort(reverse=True)

    for key, finalizer in items:
        sub_debug('calling %s', finalizer)
        try:
            finalizer()
        except Exception:
            import traceback
            traceback.print_exc()

    if minpriority is None:
        _finalizer_registry.clear()

#
# Clean up on exit
#

def is_exiting():
    '''
    Returns true if the process is shutting down
    '''
    return _exiting or _exiting is None

_exiting = False

274
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
275 276
                   active_children=process.active_children,
                   current_process=process.current_process):
277 278 279 280
    # We hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

281 282
    global _exiting

283 284
    if not _exiting:
        _exiting = True
285

286 287 288
        info('process shutting down')
        debug('running all "atexit" finalizers with priority >= 0')
        _run_finalizers(0)
289

290 291
        if current_process() is not None:
            # We check if the current process is None here because if
292
            # it's None, any call to ``active_children()`` will raise
293 294 295 296 297 298 299 300 301 302
            # an AttributeError (active_children winds up trying to
            # get attributes from util._current_process).  One
            # situation where this can happen is if someone has
            # manipulated sys.modules, causing this module to be
            # garbage collected.  The destructor for the module type
            # then replaces all values in the module dict with None.
            # For instance, after setuptools runs a test it replaces
            # sys.modules with a copy created earlier.  See issues
            # #9775 and #15881.  Also related: #4106, #9205, and
            # #9207.
303 304

            for p in active_children():
305
                if p.daemon:
306 307 308 309 310 311
                    info('calling terminate() for daemon %s', p.name)
                    p._popen.terminate()

            for p in active_children():
                info('calling join() for process %s', p.name)
                p.join()
312 313 314

        debug('running the remaining "atexit" finalizers')
        _run_finalizers()
315 316 317 318 319 320 321 322 323

atexit.register(_exit_function)

#
# Some fork aware types
#

class ForkAwareThreadLock(object):
    def __init__(self):
324 325 326 327
        self._reset()
        register_after_fork(self, ForkAwareThreadLock._reset)

    def _reset(self):
328 329 330 331
        self._lock = threading.Lock()
        self.acquire = self._lock.acquire
        self.release = self._lock.release

332 333 334 335 336 337 338
    def __enter__(self):
        return self._lock.__enter__()

    def __exit__(self, *args):
        return self._lock.__exit__(*args)


339 340 341 342 343
class ForkAwareLocal(threading.local):
    def __init__(self):
        register_after_fork(self, lambda obj : obj.__dict__.clear())
    def __reduce__(self):
        return type(self), ()
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359

#
# Close fds except those specified
#

try:
    MAXFD = os.sysconf("SC_OPEN_MAX")
except Exception:
    MAXFD = 256

def close_all_fds_except(fds):
    fds = list(fds) + [-1, MAXFD]
    fds.sort()
    assert fds[-1] == MAXFD, 'fd too large'
    for i in range(len(fds) - 1):
        os.closerange(fds[i]+1, fds[i+1])
Victor Stinner's avatar
Victor Stinner committed
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
#
# Close sys.stdin and replace stdin with os.devnull
#

def _close_stdin():
    if sys.stdin is None:
        return

    try:
        sys.stdin.close()
    except (OSError, ValueError):
        pass

    try:
        fd = os.open(os.devnull, os.O_RDONLY)
        try:
            sys.stdin = open(fd, closefd=False)
        except:
            os.close(fd)
            raise
    except (OSError, ValueError):
        pass
382 383 384 385 386 387

#
# Start a program with only specified fds kept open
#

def spawnv_passfds(path, args, passfds):
388
    import _posixsubprocess
389
    passfds = tuple(sorted(map(int, passfds)))
390
    errpipe_read, errpipe_write = os.pipe()
391 392 393 394 395 396 397 398
    try:
        return _posixsubprocess.fork_exec(
            args, [os.fsencode(path)], True, passfds, None, None,
            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
            False, False, None)
    finally:
        os.close(errpipe_read)
        os.close(errpipe_write)