semaphore_tracker.py 4.25 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# On Unix we run a server process which keeps track of unlinked
# semaphores. The server ignores SIGINT and SIGTERM and reads from a
# pipe.  Every other process of the program has a copy of the writable
# end of the pipe, so we get EOF when all other processes have exited.
# Then the server process unlinks any remaining semaphore names.
#
# This is important because the system only supports a limited number
# of named semaphores, and they will not be automatically removed till
# the next reboot.  Without this semaphore tracker process, "killall
# python" would probably leave unlinked semaphores.
#

import errno
import os
import signal
import sys
import threading
import warnings
import _multiprocessing

from . import spawn
from . import util
from . import current_process

__all__ = ['ensure_running', 'register', 'unregister']


29
_semaphore_tracker_fd = None
30 31 32 33 34 35 36 37
_lock = threading.Lock()


def ensure_running():
    '''Make sure that semaphore tracker process is running.

    This can be run from any process.  Usually a child process will use
    the semaphore created by its parent.'''
38
    global _semaphore_tracker_fd
39
    with _lock:
40
        if _semaphore_tracker_fd is not None:
41 42 43 44 45 46 47
            return
        fds_to_pass = []
        try:
            fds_to_pass.append(sys.stderr.fileno())
        except Exception:
            pass
        cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
48
        r, w = os.pipe()
49 50 51 52 53 54 55 56
        try:
            fds_to_pass.append(r)
            # process will out live us, so no need to wait on pid
            exe = spawn.get_executable()
            args = [exe] + util._args_from_interpreter_flags()
            args += ['-c', cmd % r]
            util.spawnv_passfds(exe, args, fds_to_pass)
        except:
57
            os.close(w)
58 59
            raise
        else:
60
            _semaphore_tracker_fd = w
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
        finally:
            os.close(r)


def register(name):
    '''Register name of semaphore with semaphore tracker.'''
    _send('REGISTER', name)


def unregister(name):
    '''Unregister name of semaphore with semaphore tracker.'''
    _send('UNREGISTER', name)


def _send(cmd, name):
    msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
    if len(name) > 512:
        # posix guarantees that writes to a pipe of less than PIPE_BUF
        # bytes are atomic, and that PIPE_BUF >= 512
        raise ValueError('name too long')
81
    nbytes = os.write(_semaphore_tracker_fd, msg)
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    assert nbytes == len(msg)


def main(fd):
    '''Run semaphore tracker.'''
    # protect the process from ^C and "killall python" etc
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, signal.SIG_IGN)

    for f in (sys.stdin, sys.stdout):
        try:
            f.close()
        except Exception:
            pass

    cache = set()
    try:
        # keep track of registered/unregistered semaphores
        with open(fd, 'rb') as f:
            for line in f:
                try:
                    cmd, name = line.strip().split(b':')
                    if cmd == b'REGISTER':
                        cache.add(name)
                    elif cmd == b'UNREGISTER':
                        cache.remove(name)
                    else:
                        raise RuntimeError('unrecognized command %r' % cmd)
                except Exception:
                    try:
                        sys.excepthook(*sys.exc_info())
                    except:
                        pass
    finally:
        # all processes have terminated; cleanup any remaining semaphores
        if cache:
            try:
                warnings.warn('semaphore_tracker: There appear to be %d '
                              'leaked semaphores to clean up at shutdown' %
                              len(cache))
            except Exception:
                pass
        for name in cache:
            # For some reason the process which created and registered this
            # semaphore has failed to unregister it. Presumably it has died.
            # We therefore unlink it.
            try:
                name = name.decode('ascii')
                try:
                    _multiprocessing.sem_unlink(name)
                except Exception as e:
                    warnings.warn('semaphore_tracker: %r: %s' % (name, e))
            finally:
                pass