util.py 11.8 KB
Newer Older
1 2 3 4 5
#
# Module providing various facilities to other parts of the package
#
# multiprocessing/util.py
#
6
# Copyright (c) 2006-2008, R Oudkerk
7
# Licensed to PSF under a Contributor Agreement.
8 9
#

10
import os
11
import itertools
Victor Stinner's avatar
Victor Stinner committed
12
import sys
13 14 15 16
import weakref
import atexit
import threading        # we want threading to install it's
                        # cleanup function before multiprocessing does
17
from subprocess import _args_from_interpreter_flags
18

19
from . import process
20 21 22 23

__all__ = [
    'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
    'log_to_stderr', 'get_temp_dir', 'register_after_fork',
Jesse Noller's avatar
Jesse Noller committed
24
    'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
25
    'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    ]

#
# Logging
#

NOTSET = 0
SUBDEBUG = 5
DEBUG = 10
INFO = 20
SUBWARNING = 25

LOGGER_NAME = 'multiprocessing'
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'

_logger = None
_log_to_stderr = False

def sub_debug(msg, *args):
    if _logger:
        _logger.log(SUBDEBUG, msg, *args)

def debug(msg, *args):
    if _logger:
        _logger.log(DEBUG, msg, *args)

def info(msg, *args):
    if _logger:
        _logger.log(INFO, msg, *args)

def sub_warning(msg, *args):
    if _logger:
        _logger.log(SUBWARNING, msg, *args)

def get_logger():
    '''
    Returns logger used by multiprocessing
    '''
    global _logger
65
    import logging
66

Jesse Noller's avatar
Jesse Noller committed
67 68 69
    logging._acquireLock()
    try:
        if not _logger:
70

Jesse Noller's avatar
Jesse Noller committed
71 72
            _logger = logging.getLogger(LOGGER_NAME)
            _logger.propagate = 0
73

Jesse Noller's avatar
Jesse Noller committed
74 75 76 77 78 79 80 81 82 83
            # XXX multiprocessing should cleanup before logging
            if hasattr(atexit, 'unregister'):
                atexit.unregister(_exit_function)
                atexit.register(_exit_function)
            else:
                atexit._exithandlers.remove((_exit_function, (), {}))
                atexit._exithandlers.append((_exit_function, (), {}))

    finally:
        logging._releaseLock()
84 85 86 87 88 89 90 91 92

    return _logger

def log_to_stderr(level=None):
    '''
    Turn on logging and add a handler which prints to stderr
    '''
    global _log_to_stderr
    import logging
Jesse Noller's avatar
Jesse Noller committed
93

94 95 96 97 98
    logger = get_logger()
    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
Jesse Noller's avatar
Jesse Noller committed
99 100

    if level:
101 102
        logger.setLevel(level)
    _log_to_stderr = True
Jesse Noller's avatar
Jesse Noller committed
103
    return _logger
104 105 106 107 108 109 110

#
# Function returning a temp directory which will be removed on exit
#

def get_temp_dir():
    # get name of a temp directory which will be automatically cleaned up
111 112
    tempdir = process.current_process()._config.get('tempdir')
    if tempdir is None:
113 114 115 116
        import shutil, tempfile
        tempdir = tempfile.mkdtemp(prefix='pymp-')
        info('created temp directory %s', tempdir)
        Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
117 118
        process.current_process()._config['tempdir'] = tempdir
    return tempdir
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151

#
# Support for reinitialization of objects when bootstrapping a child process
#

_afterfork_registry = weakref.WeakValueDictionary()
_afterfork_counter = itertools.count()

def _run_after_forkers():
    items = list(_afterfork_registry.items())
    items.sort()
    for (index, ident, func), obj in items:
        try:
            func(obj)
        except Exception as e:
            info('after forker raised exception %s', e)

def register_after_fork(obj, func):
    _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj

#
# Finalization using weakrefs
#

_finalizer_registry = {}
_finalizer_counter = itertools.count()


class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
152 153 154 155
        if (exitpriority is not None) and not isinstance(exitpriority,int):
            raise TypeError(
                "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
                    exitpriority, type(exitpriority)))
156 157 158

        if obj is not None:
            self._weakref = weakref.ref(obj, self)
159 160
        elif exitpriority is None:
            raise ValueError("Without object, exitpriority cannot be None")
161 162 163 164 165

        self._callback = callback
        self._args = args
        self._kwargs = kwargs or {}
        self._key = (exitpriority, next(_finalizer_counter))
166
        self._pid = os.getpid()
167 168 169

        _finalizer_registry[self._key] = self

170 171 172 173
    def __call__(self, wr=None,
                 # Need to bind these locally because the globals can have
                 # been cleared at shutdown
                 _finalizer_registry=_finalizer_registry,
174
                 sub_debug=sub_debug, getpid=os.getpid):
175 176 177 178 179 180 181 182
        '''
        Run the callback unless it has already been called or cancelled
        '''
        try:
            del _finalizer_registry[self._key]
        except KeyError:
            sub_debug('finalizer no longer registered')
        else:
183
            if self._pid != getpid():
184 185 186 187 188 189
                sub_debug('finalizer ignored because different process')
                res = None
            else:
                sub_debug('finalizer calling %s with args %s and kwargs %s',
                          self._callback, self._args, self._kwargs)
                res = self._callback(*self._args, **self._kwargs)
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
            self._weakref = self._callback = self._args = \
                            self._kwargs = self._key = None
            return res

    def cancel(self):
        '''
        Cancel finalization of the object
        '''
        try:
            del _finalizer_registry[self._key]
        except KeyError:
            pass
        else:
            self._weakref = self._callback = self._args = \
                            self._kwargs = self._key = None

    def still_active(self):
        '''
        Return whether this finalizer is still waiting to invoke callback
        '''
        return self._key in _finalizer_registry

    def __repr__(self):
        try:
            obj = self._weakref()
        except (AttributeError, TypeError):
            obj = None

        if obj is None:
219
            return '<%s object, dead>' % self.__class__.__name__
220

221 222 223
        x = '<%s object, callback=%s' % (
                self.__class__.__name__,
                getattr(self._callback, '__name__', self._callback))
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
        if self._args:
            x += ', args=' + str(self._args)
        if self._kwargs:
            x += ', kwargs=' + str(self._kwargs)
        if self._key[0] is not None:
            x += ', exitprority=' + str(self._key[0])
        return x + '>'


def _run_finalizers(minpriority=None):
    '''
    Run all finalizers whose exit priority is not None and at least minpriority

    Finalizers with highest priority are called first; finalizers with
    the same priority will be called in reverse order of creation.
    '''
240 241 242 243 244
    if _finalizer_registry is None:
        # This function may be called after this module's globals are
        # destroyed.  See the _exit_function function in this module for more
        # notes.
        return
Alexander Belopolsky's avatar
Alexander Belopolsky committed
245

246
    if minpriority is None:
247
        f = lambda p : p[0] is not None
248
    else:
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
        f = lambda p : p[0] is not None and p[0] >= minpriority

    # Careful: _finalizer_registry may be mutated while this function
    # is running (either by a GC run or by another thread).

    # list(_finalizer_registry) should be atomic, while
    # list(_finalizer_registry.items()) is not.
    keys = [key for key in list(_finalizer_registry) if f(key)]
    keys.sort(reverse=True)

    for key in keys:
        finalizer = _finalizer_registry.get(key)
        # key may have been removed from the registry
        if finalizer is not None:
            sub_debug('calling %s', finalizer)
            try:
                finalizer()
            except Exception:
                import traceback
                traceback.print_exc()
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284

    if minpriority is None:
        _finalizer_registry.clear()

#
# Clean up on exit
#

def is_exiting():
    '''
    Returns true if the process is shutting down
    '''
    return _exiting or _exiting is None

_exiting = False

285
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
286 287
                   active_children=process.active_children,
                   current_process=process.current_process):
288 289 290 291
    # We hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

292 293
    global _exiting

294 295
    if not _exiting:
        _exiting = True
296

297 298 299
        info('process shutting down')
        debug('running all "atexit" finalizers with priority >= 0')
        _run_finalizers(0)
300

301 302
        if current_process() is not None:
            # We check if the current process is None here because if
303
            # it's None, any call to ``active_children()`` will raise
304 305 306 307 308 309 310 311 312 313
            # an AttributeError (active_children winds up trying to
            # get attributes from util._current_process).  One
            # situation where this can happen is if someone has
            # manipulated sys.modules, causing this module to be
            # garbage collected.  The destructor for the module type
            # then replaces all values in the module dict with None.
            # For instance, after setuptools runs a test it replaces
            # sys.modules with a copy created earlier.  See issues
            # #9775 and #15881.  Also related: #4106, #9205, and
            # #9207.
314 315

            for p in active_children():
316
                if p.daemon:
317 318 319 320 321 322
                    info('calling terminate() for daemon %s', p.name)
                    p._popen.terminate()

            for p in active_children():
                info('calling join() for process %s', p.name)
                p.join()
323 324 325

        debug('running the remaining "atexit" finalizers')
        _run_finalizers()
326 327 328 329 330 331 332 333 334

atexit.register(_exit_function)

#
# Some fork aware types
#

class ForkAwareThreadLock(object):
    def __init__(self):
335 336 337 338
        self._reset()
        register_after_fork(self, ForkAwareThreadLock._reset)

    def _reset(self):
339 340 341 342
        self._lock = threading.Lock()
        self.acquire = self._lock.acquire
        self.release = self._lock.release

343 344 345 346 347 348 349
    def __enter__(self):
        return self._lock.__enter__()

    def __exit__(self, *args):
        return self._lock.__exit__(*args)


350 351 352 353 354
class ForkAwareLocal(threading.local):
    def __init__(self):
        register_after_fork(self, lambda obj : obj.__dict__.clear())
    def __reduce__(self):
        return type(self), ()
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370

#
# Close fds except those specified
#

try:
    MAXFD = os.sysconf("SC_OPEN_MAX")
except Exception:
    MAXFD = 256

def close_all_fds_except(fds):
    fds = list(fds) + [-1, MAXFD]
    fds.sort()
    assert fds[-1] == MAXFD, 'fd too large'
    for i in range(len(fds) - 1):
        os.closerange(fds[i]+1, fds[i+1])
Victor Stinner's avatar
Victor Stinner committed
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
#
# Close sys.stdin and replace stdin with os.devnull
#

def _close_stdin():
    if sys.stdin is None:
        return

    try:
        sys.stdin.close()
    except (OSError, ValueError):
        pass

    try:
        fd = os.open(os.devnull, os.O_RDONLY)
        try:
            sys.stdin = open(fd, closefd=False)
        except:
            os.close(fd)
            raise
    except (OSError, ValueError):
        pass
393

394 395 396 397 398 399 400 401 402 403 404 405 406 407
#
# Flush standard streams, if any
#

def _flush_std_streams():
    try:
        sys.stdout.flush()
    except (AttributeError, ValueError):
        pass
    try:
        sys.stderr.flush()
    except (AttributeError, ValueError):
        pass

408 409 410 411 412
#
# Start a program with only specified fds kept open
#

def spawnv_passfds(path, args, passfds):
413
    import _posixsubprocess
414
    passfds = tuple(sorted(map(int, passfds)))
415
    errpipe_read, errpipe_write = os.pipe()
416 417 418 419 420 421 422 423
    try:
        return _posixsubprocess.fork_exec(
            args, [os.fsencode(path)], True, passfds, None, None,
            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
            False, False, None)
    finally:
        os.close(errpipe_read)
        os.close(errpipe_write)