threading.py 30.7 KB
Newer Older
1
"""Thread module emulating a subset of Java's threading model."""
2

3 4 5 6 7 8 9 10
import sys as _sys

try:
    import thread
except ImportError:
    del _sys.modules[__name__]
    raise

11
import warnings
12 13

from functools import wraps
14
from time import time as _time, sleep as _sleep
15
from traceback import format_exc as _format_exc
16
from collections import deque
17

18 19 20 21 22 23 24 25 26 27 28
# Note regarding PEP 8 compliant aliases
#  This threading model was originally inspired by Java, and inherited
# the convention of camelCase function and method names from that
# language. While those names are not in any imminent danger of being
# deprecated, starting with Python 2.6, the module now provides a
# PEP 8 compliant alias for any such method name.
# Using the new PEP 8 compliant names also facilitates substitution
# with the multiprocessing module, which doesn't provide the old
# Java inspired names.


29
# Rename some stuff so "from threading import *" is safe
30 31
__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
           'current_thread', 'enumerate', 'Event',
32
           'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
33
           'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
34 35 36 37

_start_new_thread = thread.start_new_thread
_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
38
ThreadError = thread.error
39 40 41
del thread


42 43 44 45 46
# sys.exc_clear is used to work around the fact that except blocks
# don't fully clear the exception until 3.0.
warnings.filterwarnings('ignore', category=DeprecationWarning,
                        module='threading', message='sys.exc_clear')

47 48 49 50 51
# Debug support (adapted from ihooks.py).
# All the major classes here derive from _Verbose.  We force that to
# be a new-style class so that all the major classes here are new-style.
# This helps debugging (type(instance) is more revealing for instances
# of new-style classes).
52

53
_VERBOSE = False
54 55 56

if __debug__:

57
    class _Verbose(object):
58 59 60 61 62 63 64 65 66 67

        def __init__(self, verbose=None):
            if verbose is None:
                verbose = _VERBOSE
            self.__verbose = verbose

        def _note(self, format, *args):
            if self.__verbose:
                format = format % args
                format = "%s: %s\n" % (
68
                    current_thread().name, format)
69 70 71 72
                _sys.stderr.write(format)

else:
    # Disable this when using "python -O"
73
    class _Verbose(object):
74 75 76 77 78
        def __init__(self, verbose=None):
            pass
        def _note(self, *args):
            pass

79 80 81 82 83 84 85 86
# Support for profile and trace hooks

_profile_hook = None
_trace_hook = None

def setprofile(func):
    global _profile_hook
    _profile_hook = func
Tim Peters's avatar
Tim Peters committed
87

88 89 90
def settrace(func):
    global _trace_hook
    _trace_hook = func
91 92 93 94 95 96

# Synchronization classes

Lock = _allocate_lock

def RLock(*args, **kwargs):
97
    return _RLock(*args, **kwargs)
98 99

class _RLock(_Verbose):
Tim Peters's avatar
Tim Peters committed
100

101 102 103 104 105 106 107
    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__block = _allocate_lock()
        self.__owner = None
        self.__count = 0

    def __repr__(self):
108
        owner = self.__owner
109 110
        return "<%s(%s, %d)>" % (
                self.__class__.__name__,
111
                owner and owner.name,
112 113 114
                self.__count)

    def acquire(self, blocking=1):
115
        me = current_thread()
116 117 118 119 120 121 122 123 124 125
        if self.__owner is me:
            self.__count = self.__count + 1
            if __debug__:
                self._note("%s.acquire(%s): recursive success", self, blocking)
            return 1
        rc = self.__block.acquire(blocking)
        if rc:
            self.__owner = me
            self.__count = 1
            if __debug__:
126
                self._note("%s.acquire(%s): initial success", self, blocking)
127 128 129 130 131
        else:
            if __debug__:
                self._note("%s.acquire(%s): failure", self, blocking)
        return rc

132 133
    __enter__ = acquire

134
    def release(self):
135
        if self.__owner is not current_thread():
136
            raise RuntimeError("cannot release un-aquired lock")
137 138 139 140 141 142 143 144 145 146
        self.__count = count = self.__count - 1
        if not count:
            self.__owner = None
            self.__block.release()
            if __debug__:
                self._note("%s.release(): final release", self)
        else:
            if __debug__:
                self._note("%s.release(): non-final release", self)

147 148 149
    def __exit__(self, t, v, tb):
        self.release()

150 151
    # Internal methods used by condition variables

152 153
    def _acquire_restore(self, count_owner):
        count, owner = count_owner
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
        self.__block.acquire()
        self.__count = count
        self.__owner = owner
        if __debug__:
            self._note("%s._acquire_restore()", self)

    def _release_save(self):
        if __debug__:
            self._note("%s._release_save()", self)
        count = self.__count
        self.__count = 0
        owner = self.__owner
        self.__owner = None
        self.__block.release()
        return (count, owner)

    def _is_owned(self):
171
        return self.__owner is current_thread()
172 173 174


def Condition(*args, **kwargs):
175
    return _Condition(*args, **kwargs)
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203

class _Condition(_Verbose):

    def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self.__waiters = []

204 205 206 207 208
    def __enter__(self):
        return self.__lock.__enter__()

    def __exit__(self, *args):
        return self.__lock.__exit__(*args)
209

210 211 212 213 214 215 216 217 218 219
    def __repr__(self):
        return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))

    def _release_save(self):
        self.__lock.release()           # No state to save

    def _acquire_restore(self, x):
        self.__lock.acquire()           # Ignore saved state

    def _is_owned(self):
220
        # Return True if lock is owned by current_thread.
Jeremy Hylton's avatar
Jeremy Hylton committed
221
        # This method is called only if __lock doesn't have _is_owned().
222 223
        if self.__lock.acquire(0):
            self.__lock.release()
224
            return False
225
        else:
226
            return True
227 228

    def wait(self, timeout=None):
229 230
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-aquired lock")
231 232 233 234
        waiter = _allocate_lock()
        waiter.acquire()
        self.__waiters.append(waiter)
        saved_state = self._release_save()
235 236 237
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
238
                if __debug__:
239
                    self._note("%s.wait(): got it", self)
240
            else:
241 242 243 244 245
                # Balancing act:  We can't afford a pure busy loop, so we
                # have to sleep; but if we sleep the whole timeout time,
                # we'll be unresponsive.  The scheme here sleeps very
                # little at first, longer as time goes on, but never longer
                # than 20 times per second (or the timeout time remaining).
246
                endtime = _time() + timeout
247
                delay = 0.0005 # 500 us -> initial delay of 1 ms
248
                while True:
249
                    gotit = waiter.acquire(0)
250
                    if gotit:
251
                        break
252 253 254 255
                    remaining = endtime - _time()
                    if remaining <= 0:
                        break
                    delay = min(delay * 2, remaining, .05)
256 257 258 259 260 261 262 263 264 265 266 267 268
                    _sleep(delay)
                if not gotit:
                    if __debug__:
                        self._note("%s.wait(%s): timed out", self, timeout)
                    try:
                        self.__waiters.remove(waiter)
                    except ValueError:
                        pass
                else:
                    if __debug__:
                        self._note("%s.wait(%s): got it", self, timeout)
        finally:
            self._acquire_restore(saved_state)
269 270

    def notify(self, n=1):
271 272
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-aquired lock")
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
        __waiters = self.__waiters
        waiters = __waiters[:n]
        if not waiters:
            if __debug__:
                self._note("%s.notify(): no waiters", self)
            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")
        for waiter in waiters:
            waiter.release()
            try:
                __waiters.remove(waiter)
            except ValueError:
                pass

288
    def notifyAll(self):
289 290
        self.notify(len(self.__waiters))

291
    notify_all = notifyAll
292

293 294

def Semaphore(*args, **kwargs):
295
    return _Semaphore(*args, **kwargs)
296 297 298

class _Semaphore(_Verbose):

299
    # After Tim Peters' semaphore class, but not quite the same (no maximum)
300 301

    def __init__(self, value=1, verbose=None):
302 303
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
304 305 306 307 308
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__value = value

    def acquire(self, blocking=1):
309
        rc = False
310 311 312 313
        self.__cond.acquire()
        while self.__value == 0:
            if not blocking:
                break
314 315 316
            if __debug__:
                self._note("%s.acquire(%s): blocked waiting, value=%s",
                           self, blocking, self.__value)
317 318 319
            self.__cond.wait()
        else:
            self.__value = self.__value - 1
320
            if __debug__:
321 322
                self._note("%s.acquire: success, value=%s",
                           self, self.__value)
323
            rc = True
324 325 326
        self.__cond.release()
        return rc

327 328
    __enter__ = acquire

329 330 331
    def release(self):
        self.__cond.acquire()
        self.__value = self.__value + 1
332
        if __debug__:
333 334
            self._note("%s.release: success, value=%s",
                       self, self.__value)
335 336 337
        self.__cond.notify()
        self.__cond.release()

338 339 340
    def __exit__(self, t, v, tb):
        self.release()

341

342
def BoundedSemaphore(*args, **kwargs):
343
    return _BoundedSemaphore(*args, **kwargs)
344 345 346 347 348 349 350 351 352 353 354 355 356

class _BoundedSemaphore(_Semaphore):
    """Semaphore that checks that # releases is <= # acquires"""
    def __init__(self, value=1, verbose=None):
        _Semaphore.__init__(self, value, verbose)
        self._initial_value = value

    def release(self):
        if self._Semaphore__value >= self._initial_value:
            raise ValueError, "Semaphore released too many times"
        return _Semaphore.release(self)


357
def Event(*args, **kwargs):
358
    return _Event(*args, **kwargs)
359 360 361 362 363 364 365 366

class _Event(_Verbose):

    # After Tim Peters' event class (without is_posted())

    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
367
        self.__flag = False
368

369
    def isSet(self):
370 371
        return self.__flag

372
    is_set = isSet
373

374
    def set(self):
375 376
        self.__cond.acquire()
        try:
377
            self.__flag = True
378
            self.__cond.notify_all()
379 380
        finally:
            self.__cond.release()
381 382

    def clear(self):
383 384
        self.__cond.acquire()
        try:
385
            self.__flag = False
386 387
        finally:
            self.__cond.release()
388 389

    def wait(self, timeout=None):
390 391
        self.__cond.acquire()
        try:
392 393
            if not self.__flag:
                self.__cond.wait(timeout)
394 395
        finally:
            self.__cond.release()
396 397 398 399 400 401 402 403 404 405

# Helper to generate new thread names
_counter = 0
def _newname(template="Thread-%d"):
    global _counter
    _counter = _counter + 1
    return template % _counter

# Active thread administration
_active_limbo_lock = _allocate_lock()
406
_active = {}    # maps thread id to Thread object
407 408 409 410 411 412 413
_limbo = {}


# Main class for threads

class Thread(_Verbose):

414
    __initialized = False
415 416 417 418 419
    # Need to store a reference to sys.exc_info for printing
    # out exceptions when a thread tries to use a global var. during interp.
    # shutdown and thus raises an exception about trying to perform some
    # operation on/with a NoneType
    __exc_info = _sys.exc_info
420 421 422
    # Keep sys.exc_clear too to clear the exception just before
    # allowing .join() to return.
    __exc_clear = _sys.exc_clear
423 424

    def __init__(self, group=None, target=None, name=None,
425
                 args=(), kwargs=None, verbose=None):
426
        assert group is None, "group argument must be None for now"
427
        _Verbose.__init__(self, verbose)
428 429
        if kwargs is None:
            kwargs = {}
430 431 432 433 434
        self.__target = target
        self.__name = str(name or _newname())
        self.__args = args
        self.__kwargs = kwargs
        self.__daemonic = self._set_daemon()
435
        self.__ident = None
436
        self.__started = Event()
437
        self.__stopped = False
438
        self.__block = Condition(Lock())
439
        self.__initialized = True
440 441 442
        # sys.stderr is not stored in the class like
        # sys.exc_info since it can be changed between instances
        self.__stderr = _sys.stderr
443 444 445

    def _set_daemon(self):
        # Overridden in _MainThread and _DummyThread
446
        return current_thread().daemon
447 448 449 450

    def __repr__(self):
        assert self.__initialized, "Thread.__init__() was not called"
        status = "initial"
451
        if self.__started.is_set():
452 453 454 455
            status = "started"
        if self.__stopped:
            status = "stopped"
        if self.__daemonic:
456 457 458
            status += " daemon"
        if self.__ident is not None:
            status += " %s" % self.__ident
459 460 461
        return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)

    def start(self):
462 463
        if not self.__initialized:
            raise RuntimeError("thread.__init__() not called")
464
        if self.__started.is_set():
465
            raise RuntimeError("thread already started")
466 467 468 469 470 471
        if __debug__:
            self._note("%s.start(): starting thread", self)
        _active_limbo_lock.acquire()
        _limbo[self] = self
        _active_limbo_lock.release()
        _start_new_thread(self.__bootstrap, ())
472
        self.__started.wait()
473 474

    def run(self):
475 476 477 478 479 480 481
        try:
            if self.__target:
                self.__target(*self.__args, **self.__kwargs)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self.__target, self.__args, self.__kwargs
482 483

    def __bootstrap(self):
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
        # Wrapper around the real bootstrap code that ignores
        # exceptions during interpreter cleanup.  Those typically
        # happen when a daemon thread wakes up at an unfortunate
        # moment, finds the world around it destroyed, and raises some
        # random exception *** while trying to report the exception in
        # __bootstrap_inner() below ***.  Those random exceptions
        # don't help anybody, and they confuse users, so we suppress
        # them.  We suppress them only when it appears that the world
        # indeed has already been destroyed, so that exceptions in
        # __bootstrap_inner() during normal business hours are properly
        # reported.  Also, we only suppress them for daemonic threads;
        # if a non-daemonic encounters this, something else is wrong.
        try:
            self.__bootstrap_inner()
        except:
            if self.__daemonic and _sys is None:
                return
            raise

    def __bootstrap_inner(self):
504
        try:
505
            self.__ident = _get_ident()
506
            self.__started.set()
507
            _active_limbo_lock.acquire()
508
            _active[self.__ident] = self
509 510 511 512
            del _limbo[self]
            _active_limbo_lock.release()
            if __debug__:
                self._note("%s.__bootstrap(): thread started", self)
513 514 515 516 517 518 519

            if _trace_hook:
                self._note("%s.__bootstrap(): registering trace hook", self)
                _sys.settrace(_trace_hook)
            if _profile_hook:
                self._note("%s.__bootstrap(): registering profile hook", self)
                _sys.setprofile(_profile_hook)
Tim Peters's avatar
Tim Peters committed
520

521 522 523 524 525 526 527 528
            try:
                self.run()
            except SystemExit:
                if __debug__:
                    self._note("%s.__bootstrap(): raised SystemExit", self)
            except:
                if __debug__:
                    self._note("%s.__bootstrap(): unhandled exception", self)
529 530 531 532 533 534
                # If sys.stderr is no more (most likely from interpreter
                # shutdown) use self.__stderr.  Otherwise still use sys (as in
                # _sys) in case sys.stderr was redefined since the creation of
                # self.
                if _sys:
                    _sys.stderr.write("Exception in thread %s:\n%s\n" %
535
                                      (self.name, _format_exc()))
536 537 538 539 540 541 542
                else:
                    # Do the best job possible w/o a huge amt. of code to
                    # approximate a traceback (code ideas from
                    # Lib/traceback.py)
                    exc_type, exc_value, exc_tb = self.__exc_info()
                    try:
                        print>>self.__stderr, (
543
                            "Exception in thread " + self.name +
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
                            " (most likely raised during interpreter shutdown):")
                        print>>self.__stderr, (
                            "Traceback (most recent call last):")
                        while exc_tb:
                            print>>self.__stderr, (
                                '  File "%s", line %s, in %s' %
                                (exc_tb.tb_frame.f_code.co_filename,
                                    exc_tb.tb_lineno,
                                    exc_tb.tb_frame.f_code.co_name))
                            exc_tb = exc_tb.tb_next
                        print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
                    # Make sure that exc_tb gets deleted since it is a memory
                    # hog; deleting everything else is just for thoroughness
                    finally:
                        del exc_type, exc_value, exc_tb
559 560 561
            else:
                if __debug__:
                    self._note("%s.__bootstrap(): normal return", self)
562 563 564 565 566
            finally:
                # Prevent a race in
                # test_threading.test_no_refcycle_through_target when
                # the exception keeps the target alive past when we
                # assert that it's dead.
567
                self.__exc_clear()
568
        finally:
569 570 571 572 573 574 575 576
            with _active_limbo_lock:
                self.__stop()
                try:
                    # We don't call self.__delete() because it also
                    # grabs _active_limbo_lock.
                    del _active[_get_ident()]
                except:
                    pass
577 578

    def __stop(self):
579 580
        self.__block.acquire()
        self.__stopped = True
581
        self.__block.notify_all()
582
        self.__block.release()
583 584

    def __delete(self):
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
        "Remove current thread from the dict of currently running threads."

        # Notes about running with dummy_thread:
        #
        # Must take care to not raise an exception if dummy_thread is being
        # used (and thus this module is being used as an instance of
        # dummy_threading).  dummy_thread.get_ident() always returns -1 since
        # there is only one thread if dummy_thread is being used.  Thus
        # len(_active) is always <= 1 here, and any Thread instance created
        # overwrites the (if any) thread currently registered in _active.
        #
        # An instance of _MainThread is always created by 'threading'.  This
        # gets overwritten the instant an instance of Thread is created; both
        # threads return -1 from dummy_thread.get_ident() and thus have the
        # same key in the dict.  So when the _MainThread instance created by
        # 'threading' tries to clean itself up when atexit calls this method
        # it gets a KeyError if another Thread instance was created.
        #
        # This all means that KeyError from trying to delete something from
        # _active if dummy_threading is being used is a red herring.  But
        # since it isn't if dummy_threading is *not* being used then don't
        # hide the exception.

608
        try:
609
            with _active_limbo_lock:
610
                del _active[_get_ident()]
611 612 613
                # There must not be any python code between the previous line
                # and after the lock is released.  Otherwise a tracing function
                # could try to acquire the lock again in the same thread, (in
614
                # current_thread()), and would block.
615 616 617
        except KeyError:
            if 'dummy_threading' not in _sys.modules:
                raise
618 619

    def join(self, timeout=None):
620 621
        if not self.__initialized:
            raise RuntimeError("Thread.__init__() not called")
622
        if not self.__started.is_set():
623
            raise RuntimeError("cannot join thread before it is started")
624
        if self is current_thread():
625 626
            raise RuntimeError("cannot join current thread")

627 628 629
        if __debug__:
            if not self.__stopped:
                self._note("%s.join(): waiting until thread stops", self)
630 631
        self.__block.acquire()
        try:
632 633 634
            if timeout is None:
                while not self.__stopped:
                    self.__block.wait()
635 636
                if __debug__:
                    self._note("%s.join(): thread stopped", self)
637 638 639 640 641 642 643 644 645 646 647 648
            else:
                deadline = _time() + timeout
                while not self.__stopped:
                    delay = deadline - _time()
                    if delay <= 0:
                        if __debug__:
                            self._note("%s.join(): timed out", self)
                        break
                    self.__block.wait(delay)
                else:
                    if __debug__:
                        self._note("%s.join(): thread stopped", self)
649 650
        finally:
            self.__block.release()
651

652 653
    @property
    def name(self):
654 655 656
        assert self.__initialized, "Thread.__init__() not called"
        return self.__name

657 658
    @name.setter
    def name(self, name):
659 660 661
        assert self.__initialized, "Thread.__init__() not called"
        self.__name = str(name)

662 663
    @property
    def ident(self):
664 665 666
        assert self.__initialized, "Thread.__init__() not called"
        return self.__ident

667
    def isAlive(self):
668
        assert self.__initialized, "Thread.__init__() not called"
669
        return self.__started.is_set() and not self.__stopped
Tim Peters's avatar
Tim Peters committed
670

671
    is_alive = isAlive
672

673 674
    @property
    def daemon(self):
675 676 677
        assert self.__initialized, "Thread.__init__() not called"
        return self.__daemonic

678 679
    @daemon.setter
    def daemon(self, daemonic):
680 681
        if not self.__initialized:
            raise RuntimeError("Thread.__init__() not called")
682
        if self.__started.is_set():
683
            raise RuntimeError("cannot set daemon status of active thread");
684 685
        self.__daemonic = daemonic

686 687 688 689 690 691 692 693 694 695 696 697
    def isDaemon(self):
        return self.daemon

    def setDaemon(self, daemonic):
        self.daemon = daemonic

    def getName(self):
        return self.name

    def setName(self, name):
        self.name = name

698 699 700 701 702 703 704
# The timer class was contributed by Itamar Shtull-Trauring

def Timer(*args, **kwargs):
    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:
Tim Peters's avatar
Tim Peters committed
705

706 707 708 709
    t = Timer(30.0, f, args=[], kwargs={})
    t.start()
    t.cancel() # stop the timer's action if it's still waiting
    """
Tim Peters's avatar
Tim Peters committed
710

711 712 713 714 715 716 717
    def __init__(self, interval, function, args=[], kwargs={}):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()
Tim Peters's avatar
Tim Peters committed
718

719 720 721
    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()
Tim Peters's avatar
Tim Peters committed
722

723 724
    def run(self):
        self.finished.wait(self.interval)
725
        if not self.finished.is_set():
726 727
            self.function(*self.args, **self.kwargs)
        self.finished.set()
728 729 730 731 732 733 734 735

# Special thread class to represent the main thread
# This is garbage collected through an exit handler

class _MainThread(Thread):

    def __init__(self):
        Thread.__init__(self, name="MainThread")
736
        self._Thread__started.set()
737 738 739 740 741
        _active_limbo_lock.acquire()
        _active[_get_ident()] = self
        _active_limbo_lock.release()

    def _set_daemon(self):
742
        return False
743

744
    def _exitfunc(self):
745 746 747 748 749 750 751 752 753 754 755 756 757 758
        self._Thread__stop()
        t = _pickSomeNonDaemonThread()
        if t:
            if __debug__:
                self._note("%s: waiting for other threads", self)
        while t:
            t.join()
            t = _pickSomeNonDaemonThread()
        if __debug__:
            self._note("%s: exiting", self)
        self._Thread__delete()

def _pickSomeNonDaemonThread():
    for t in enumerate():
759
        if not t.daemon and t.is_alive():
760 761 762 763 764
            return t
    return None


# Dummy thread class to represent threads not started here.
765
# These aren't garbage collected when they die, nor can they be waited for.
766
# If they invoke anything in threading.py that calls current_thread(), they
767
# leave an entry in the _active dict forever after.
768
# Their purpose is to return *something* from current_thread().
769 770 771 772
# They are marked as daemon threads so we won't wait for them
# when we exit (conform previous semantics).

class _DummyThread(Thread):
Tim Peters's avatar
Tim Peters committed
773

774 775
    def __init__(self):
        Thread.__init__(self, name=_newname("Dummy-%d"))
776 777 778 779

        # Thread.__block consumes an OS-level locking primitive, which
        # can never be used by a _DummyThread.  Since a _DummyThread
        # instance is immortal, that's bad, so release this resource.
780
        del self._Thread__block
781

782
        self._Thread__started.set()
783 784 785 786 787
        _active_limbo_lock.acquire()
        _active[_get_ident()] = self
        _active_limbo_lock.release()

    def _set_daemon(self):
788
        return True
789

790
    def join(self, timeout=None):
791
        assert False, "cannot join a dummy thread"
792 793 794 795


# Global API functions

796
def currentThread():
797 798 799
    try:
        return _active[_get_ident()]
    except KeyError:
800
        ##print "current_thread(): no current thread for", _get_ident()
801 802
        return _DummyThread()

803
current_thread = currentThread
804

805
def activeCount():
806 807 808 809 810
    _active_limbo_lock.acquire()
    count = len(_active) + len(_limbo)
    _active_limbo_lock.release()
    return count

811
active_count = activeCount
812

813 814 815 816 817 818
def enumerate():
    _active_limbo_lock.acquire()
    active = _active.values() + _limbo.values()
    _active_limbo_lock.release()
    return active

819 820
from thread import stack_size

821 822 823
# Create the main thread object,
# and make it available for the interpreter
# (Py_Main) as threading._shutdown.
824

825
_shutdown = _MainThread()._exitfunc
826

827 828 829 830 831 832 833 834
# get thread-local implementation, either from the thread
# module, or from the python fallback

try:
    from thread import _local as local
except ImportError:
    from _threading_local import local

835

836 837 838 839 840 841 842 843 844 845 846 847 848 849
def _after_fork():
    # This function is called by Python/ceval.c:PyEval_ReInitThreads which
    # is called from PyOS_AfterFork.  Here we cleanup threading module state
    # that should not exist after a fork.

    # Reset _active_limbo_lock, in case we forked while the lock was held
    # by another (non-forked) thread.  http://bugs.python.org/issue874900
    global _active_limbo_lock
    _active_limbo_lock = _allocate_lock()

    # fork() only copied the current thread; clear references to others.
    new_active = {}
    current = current_thread()
    with _active_limbo_lock:
850
        for thread in _active.itervalues():
851
            if thread is current:
852 853 854 855
                # There is only one active thread. We reset the ident to
                # its new value since it can have changed.
                ident = _get_ident()
                thread._Thread__ident = ident
856 857 858 859 860 861 862 863 864 865 866 867 868 869
                new_active[ident] = thread
            else:
                # All the others are already stopped.
                # We don't call _Thread__stop() because it tries to acquire
                # thread._Thread__block which could also have been held while
                # we forked.
                thread._Thread__stopped = True

        _limbo.clear()
        _active.clear()
        _active.update(new_active)
        assert len(_active) == 1


870 871 872 873 874 875 876 877 878 879 880 881
# Self-test code

def _test():

    class BoundedQueue(_Verbose):

        def __init__(self, limit):
            _Verbose.__init__(self)
            self.mon = RLock()
            self.rc = Condition(self.mon)
            self.wc = Condition(self.mon)
            self.limit = limit
882
            self.queue = deque()
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899

        def put(self, item):
            self.mon.acquire()
            while len(self.queue) >= self.limit:
                self._note("put(%s): queue full", item)
                self.wc.wait()
            self.queue.append(item)
            self._note("put(%s): appended, length now %d",
                       item, len(self.queue))
            self.rc.notify()
            self.mon.release()

        def get(self):
            self.mon.acquire()
            while not self.queue:
                self._note("get(): queue empty")
                self.rc.wait()
900
            item = self.queue.popleft()
901 902 903 904 905 906 907 908 909 910 911 912 913
            self._note("get(): got %s, %d left", item, len(self.queue))
            self.wc.notify()
            self.mon.release()
            return item

    class ProducerThread(Thread):

        def __init__(self, queue, quota):
            Thread.__init__(self, name="Producer")
            self.queue = queue
            self.quota = quota

        def run(self):
914
            from random import random
915 916 917
            counter = 0
            while counter < self.quota:
                counter = counter + 1
918
                self.queue.put("%s.%d" % (self.name, counter))
919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
                _sleep(random() * 0.00001)


    class ConsumerThread(Thread):

        def __init__(self, queue, count):
            Thread.__init__(self, name="Consumer")
            self.queue = queue
            self.count = count

        def run(self):
            while self.count > 0:
                item = self.queue.get()
                print item
                self.count = self.count - 1

    NP = 3
    QL = 4
    NI = 5

    Q = BoundedQueue(QL)
    P = []
    for i in range(NP):
        t = ProducerThread(Q, NI)
943
        t.name = ("Producer-%d" % (i+1))
944 945 946 947 948 949 950 951 952 953 954 955
        P.append(t)
    C = ConsumerThread(Q, NI*NP)
    for t in P:
        t.start()
        _sleep(0.000001)
    C.start()
    for t in P:
        t.join()
    C.join()

if __name__ == '__main__':
    _test()