queue.py 9.03 KB
Newer Older
1
"""A multi-producer, multi-consumer queue."""
2

3
from time import time as _time
4 5 6 7
try:
    import threading as _threading
except ImportError:
    import dummy_threading as _threading
8
from collections import deque
9
import heapq
10

11
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
Brett Cannon's avatar
Brett Cannon committed
12

13 14 15 16 17 18 19
class Empty(Exception):
    "Exception raised by Queue.get(block=0)/get_nowait()."
    pass

class Full(Exception):
    "Exception raised by Queue.put(block=0)/put_nowait()."
    pass
20 21

class Queue:
22
    """Create a queue object with a given maximum size.
23

24 25 26
    If maxsize is <= 0, the queue size is infinite.
    """
    def __init__(self, maxsize=0):
27
        self.maxsize = maxsize
28
        self._init(maxsize)
29 30
        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
31
        # is shared between the three conditions, so acquiring and
32
        # releasing the conditions also acquires and releases mutex.
33
        self.mutex = _threading.Lock()
34 35
        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
36
        self.not_empty = _threading.Condition(self.mutex)
37 38
        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
39
        self.not_full = _threading.Condition(self.mutex)
40 41
        # Notify all_tasks_done whenever the number of unfinished tasks
        # drops to zero; thread waiting to join() is notified to resume
42
        self.all_tasks_done = _threading.Condition(self.mutex)
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
        self.unfinished_tasks = 0

    def task_done(self):
        """Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        """
        self.all_tasks_done.acquire()
        try:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
65
                self.all_tasks_done.notify_all()
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
            self.unfinished_tasks = unfinished
        finally:
            self.all_tasks_done.release()

    def join(self):
        """Blocks until all items in the Queue have been gotten and processed.

        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.

        When the count of unfinished tasks drops to zero, join() unblocks.
        """
        self.all_tasks_done.acquire()
        try:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()
        finally:
            self.all_tasks_done.release()
85 86

    def qsize(self):
Guido van Rossum's avatar
Guido van Rossum committed
87
        """Return the approximate size of the queue (not reliable!)."""
88
        self.mutex.acquire()
89
        n = self._qsize()
90
        self.mutex.release()
91
        return n
92

93
    def empty(self):
94 95 96 97 98 99 100 101 102 103 104
        """Return True if the queue is empty, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() == 0
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can grow before the result of empty() or
        qsize() can be used.

        To create code that needs to wait for all queued tasks to be
        completed, the preferred technique is to use the join() method.

        """
105 106 107 108 109 110
        self.mutex.acquire()
        n = not self._qsize()
        self.mutex.release()
        return n

    def full(self):
111 112
        """Return True if the queue is full, False otherwise (not reliable!).

113
        This method is likely to be removed at some point.  Use qsize() >= n
114 115 116 117 118
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can shrink before the result of full() or
        qsize() can be used.

        """
119
        self.mutex.acquire()
120
        n = 0 < self.maxsize <= self._qsize()
121 122 123
        self.mutex.release()
        return n

124
    def put(self, item, block=True, timeout=None):
125 126
        """Put an item into the queue.

127 128 129 130 131 132 133
        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
Guido van Rossum's avatar
Guido van Rossum committed
134
        """
135 136
        self.not_full.acquire()
        try:
137 138
            if self.maxsize > 0:
                if not block:
139
                    if self._qsize() >= self.maxsize:
140 141
                        raise Full
                elif timeout is None:
142
                    while self._qsize() >= self.maxsize:
143
                        self.not_full.wait()
144
                elif timeout < 0:
145
                    raise ValueError("'timeout' must be a positive number")
146 147
                else:
                    endtime = _time() + timeout
148
                    while self._qsize() >= self.maxsize:
149 150 151 152
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
153
            self._put(item)
154
            self.unfinished_tasks += 1
155
            self.not_empty.notify()
156
        finally:
157
            self.not_full.release()
158

Guido van Rossum's avatar
Guido van Rossum committed
159 160
    def put_nowait(self, item):
        """Put an item into the queue without blocking.
161

Guido van Rossum's avatar
Guido van Rossum committed
162 163
        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
164
        """
165
        return self.put(item, False)
166

167
    def get(self, block=True, timeout=None):
Guido van Rossum's avatar
Guido van Rossum committed
168
        """Remove and return an item from the queue.
169

170 171 172 173 174 175 176
        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
177
        """
178 179
        self.not_empty.acquire()
        try:
180
            if not block:
181
                if not self._qsize():
182 183
                    raise Empty
            elif timeout is None:
184
                while not self._qsize():
185
                    self.not_empty.wait()
186 187
            elif timeout < 0:
                raise ValueError("'timeout' must be a positive number")
188
            else:
189
                endtime = _time() + timeout
190
                while not self._qsize():
191
                    remaining = endtime - _time()
192
                    if remaining <= 0.0:
193
                        raise Empty
194
                    self.not_empty.wait(remaining)
195
            item = self._get()
196 197
            self.not_full.notify()
            return item
198
        finally:
199
            self.not_empty.release()
200

Guido van Rossum's avatar
Guido van Rossum committed
201 202
    def get_nowait(self):
        """Remove and return an item from the queue without blocking.
203

204
        Only get an item if one is immediately available. Otherwise
Guido van Rossum's avatar
Guido van Rossum committed
205 206
        raise the Empty exception.
        """
207
        return self.get(False)
208 209 210 211 212 213 214

    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held

    # Initialize the queue representation
    def _init(self, maxsize):
215
        self.queue = deque()
216

217
    def _qsize(self, len=len):
218
        return len(self.queue)
219 220 221

    # Put a new item in the queue
    def _put(self, item):
222
        self.queue.append(item)
223 224 225

    # Get an item from the queue
    def _get(self):
226
        return self.queue.popleft()
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261


class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self, len=len):
        return len(self.queue)

    def _put(self, item, heappush=heapq.heappush):
        heappush(self.queue, item)

    def _get(self, heappop=heapq.heappop):
        return heappop(self.queue)


class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self, len=len):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()