Queue.py 3.98 KB
Newer Older
1 2
# A multi-producer, multi-consumer queue.

3 4 5 6
# define this exception to be compatible with Python 1.5's class
# exceptions, but also when -X option is used.
try:
    class Empty(Exception):
7
        pass
8 9
except TypeError:
    # string based exceptions
10
    Empty = 'Queue.Empty'               # Exception raised by get_nowait()
11 12

class Queue:
13
    def __init__(self, maxsize):
14
        """Initialize a queue object with a given maximum size.
15

16 17 18 19 20 21
        If maxsize is <= 0, the queue size is infinite.
        """
        import thread
        self._init(maxsize)
        self.mutex = thread.allocate_lock()
        self.esema = thread.allocate_lock()
22
        self.esema.acquire()
23
        self.fsema = thread.allocate_lock()
24 25

    def qsize(self):
26
        """Returns the approximate size of the queue (not reliable!)."""
27
        self.mutex.acquire()
28
        n = self._qsize()
29
        self.mutex.release()
30
        return n
31 32

    def empty(self):
33
        """Returns 1 if the queue is empty, 0 otherwise (not reliable!)."""
34
        self.mutex.acquire()
35
        n = self._empty()
36
        self.mutex.release()
37
        return n
38 39

    def full(self):
40
        """Returns 1 if the queue is full, 0 otherwise (not reliable!)."""
41
        self.mutex.acquire()
42
        n = self._full()
43
        self.mutex.release()
44
        return n
45 46

    def put(self, item):
47 48 49 50
        """Put an item into the queue.

	If the queue is full, block until a free slot is avaiable.
	"""
51 52
        self.fsema.acquire()
        self.mutex.acquire()
53 54 55
        was_empty = self._empty()
        self._put(item)
        if was_empty:
56
            self.esema.release()
57
        if not self._full():
58 59
            self.fsema.release()
        self.mutex.release()
60 61

    def get(self):
62
        """Gets and returns an item from the queue.
63

64 65
        This method blocks if necessary until an item is available.
        """
66 67
        self.esema.acquire()
        self.mutex.acquire()
68 69 70
        was_full = self._full()
        item = self._get()
        if was_full:
71
            self.fsema.release()
72
        if not self._empty():
73 74
            self.esema.release()
        self.mutex.release()
75
        return item
76 77 78 79

    # Get an item from the queue if one is immediately available,
    # raise Empty if the queue is empty or temporarily unavailable
    def get_nowait(self):
80
        """Gets and returns an item from the queue.
81

82 83 84 85
        Only gets an item if one is immediately available, Otherwise
        this raises the Empty exception if the queue is empty or
        temporarily unavailable.
        """
86 87
        locked = self.esema.acquire(0)
        self.mutex.acquire()
88 89
        if self._empty():
            # The queue is empty -- we can't have esema
90
            self.mutex.release()
91 92
            raise Empty
        if not locked:
93
            locked = self.esema.acquire(0)
94 95 96 97
            if not locked:
                # Somebody else has esema
                # but we have mutex --
                # go out of their way
98
                self.mutex.release()
99 100 101 102
                raise Empty
        was_full = self._full()
        item = self._get()
        if was_full:
103
            self.fsema.release()
104
        if not self._empty():
105 106
            self.esema.release()
        self.mutex.release()
107
        return item
108 109 110 111 112 113 114 115 116 117

    # XXX Need to define put_nowait() as well.


    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held

    # Initialize the queue representation
    def _init(self, maxsize):
118 119
        self.maxsize = maxsize
        self.queue = []
120 121

    def _qsize(self):
122
        return len(self.queue)
123 124 125

    # Check wheter the queue is empty
    def _empty(self):
126
        return not self.queue
127 128 129

    # Check whether the queue is full
    def _full(self):
130
        return self.maxsize > 0 and len(self.queue) == self.maxsize
131 132 133

    # Put a new item in the queue
    def _put(self, item):
134
        self.queue.append(item)
135 136 137

    # Get an item from the queue
    def _get(self):
138 139 140
        item = self.queue[0]
        del self.queue[0]
        return item