Queue.py 5.74 KB
Newer Older
1
"""A multi-producer, multi-consumer queue."""
2

3
from time import time as _time
4
from collections import deque
5

Brett Cannon's avatar
Brett Cannon committed
6 7
__all__ = ['Empty', 'Full', 'Queue']

8 9 10 11 12 13 14
class Empty(Exception):
    "Exception raised by Queue.get(block=0)/get_nowait()."
    pass

class Full(Exception):
    "Exception raised by Queue.put(block=0)/put_nowait()."
    pass
15 16

class Queue:
17
    def __init__(self, maxsize=0):
18
        """Initialize a queue object with a given maximum size.
19

20 21
        If maxsize is <= 0, the queue size is infinite.
        """
22
        try:
23
            import threading
24
        except ImportError:
25
            import dummy_threading as threading
26
        self._init(maxsize)
27 28 29 30 31 32 33 34 35 36 37
        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
        # is shared between the two conditions, so acquiring and
        # releasing the conditions also acquires and releases mutex.
        self.mutex = threading.Lock()
        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = threading.Condition(self.mutex)
        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
        self.not_full = threading.Condition(self.mutex)
38 39

    def qsize(self):
Guido van Rossum's avatar
Guido van Rossum committed
40
        """Return the approximate size of the queue (not reliable!)."""
41
        self.mutex.acquire()
42
        n = self._qsize()
43
        self.mutex.release()
44
        return n
45 46

    def empty(self):
47
        """Return True if the queue is empty, False otherwise (not reliable!)."""
48
        self.mutex.acquire()
49
        n = self._empty()
50
        self.mutex.release()
51
        return n
52 53

    def full(self):
54
        """Return True if the queue is full, False otherwise (not reliable!)."""
55
        self.mutex.acquire()
56
        n = self._full()
57
        self.mutex.release()
58
        return n
59

60
    def put(self, item, block=True, timeout=None):
61 62
        """Put an item into the queue.

63 64 65 66 67 68 69
        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
Guido van Rossum's avatar
Guido van Rossum committed
70
        """
71 72
        self.not_full.acquire()
        try:
73 74 75 76
            if not block:
                if self._full():
                    raise Full
            elif timeout is None:
77 78 79 80 81
                while self._full():
                    self.not_full.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
82
                endtime = _time() + timeout
83
                while self._full():
84
                    remaining = endtime - _time()
85
                    if remaining <= 0.0:
86
                        raise Full
87
                    self.not_full.wait(remaining)
88
            self._put(item)
89
            self.not_empty.notify()
90
        finally:
91
            self.not_full.release()
92

Guido van Rossum's avatar
Guido van Rossum committed
93 94
    def put_nowait(self, item):
        """Put an item into the queue without blocking.
95

Guido van Rossum's avatar
Guido van Rossum committed
96 97
        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
98
        """
99
        return self.put(item, False)
100

101
    def get(self, block=True, timeout=None):
Guido van Rossum's avatar
Guido van Rossum committed
102
        """Remove and return an item from the queue.
103

104 105 106 107 108 109 110
        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
111
        """
112 113
        self.not_empty.acquire()
        try:
114 115 116 117
            if not block:
                if self._empty():
                    raise Empty
            elif timeout is None:
118 119 120 121 122
                while self._empty():
                    self.not_empty.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
123
                endtime = _time() + timeout
124
                while self._empty():
125
                    remaining = endtime - _time()
126
                    if remaining <= 0.0:
127
                        raise Empty
128
                    self.not_empty.wait(remaining)
129
            item = self._get()
130 131
            self.not_full.notify()
            return item
132
        finally:
133
            self.not_empty.release()
134

Guido van Rossum's avatar
Guido van Rossum committed
135 136
    def get_nowait(self):
        """Remove and return an item from the queue without blocking.
137

138
        Only get an item if one is immediately available. Otherwise
Guido van Rossum's avatar
Guido van Rossum committed
139 140
        raise the Empty exception.
        """
141
        return self.get(False)
142 143 144 145 146 147 148

    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held

    # Initialize the queue representation
    def _init(self, maxsize):
149
        self.maxsize = maxsize
150
        self.queue = deque()
151 152

    def _qsize(self):
153
        return len(self.queue)
154

Jeremy Hylton's avatar
Jeremy Hylton committed
155
    # Check whether the queue is empty
156
    def _empty(self):
157
        return not self.queue
158 159 160

    # Check whether the queue is full
    def _full(self):
161
        return self.maxsize > 0 and len(self.queue) == self.maxsize
162 163 164

    # Put a new item in the queue
    def _put(self, item):
165
        self.queue.append(item)
166 167 168

    # Get an item from the queue
    def _get(self):
169
        return self.queue.popleft()