Queue.py 2.91 KB
Newer Older
1 2 3 4 5 6 7 8
# A multi-producer, multi-consumer queue.

Empty = 'Queue.Empty' # Exception raised by get_nowait()

class Queue:

	# Initialize a queue object with a given maximum size
	# (If maxsize is <= 0, the maximum size is infinite)
9
	def __init__(self, maxsize):
Guido van Rossum's avatar
Guido van Rossum committed
10
		import thread
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
		self._init(maxsize)
		self.mutex = thread.allocate_lock()
		self.esema = thread.allocate_lock()
		self.esema.acquire_lock()
		self.fsema = thread.allocate_lock()

	# Get an approximation of the queue size (not reliable!)
	def qsize(self):
		self.mutex.acquire_lock()
		n = self._qsize()
		self.mutex.release_lock()
		return n

	# Check if the queue is empty (not reliable!)
	def empty(self):
		self.mutex.acquire_lock()
		n = self._empty()
		self.mutex.release_lock()
		return n

	# Check if the queue is full (not reliable!)
	def full(self):
		self.mutex.acquire_lock()
		n = self._full()
		self.mutex.release_lock()
		return n

	# Put a new item into the queue
	def put(self, item):
		self.fsema.acquire_lock()
		self.mutex.acquire_lock()
		was_empty = self._empty()
		self._put(item)
		if was_empty:
			self.esema.release_lock()
		if not self._full():
			self.fsema.release_lock()
		self.mutex.release_lock()

	# Get an item from the queue,
	# blocking if necessary until one is available
	def get(self):
		self.esema.acquire_lock()
		self.mutex.acquire_lock()
		was_full = self._full()
		item = self._get()
		if was_full:
			self.fsema.release_lock()
		if not self._empty():
			self.esema.release_lock()
		self.mutex.release_lock()
		return item

	# Get an item from the queue if one is immediately available,
	# raise Empty if the queue is empty or temporarily unavailable
	def get_nowait(self):
		locked = self.esema.acquire_lock(0)
		self.mutex.acquire_lock()
		if self._empty():
			# The queue is empyt -- we can't have esema
			self.mutex.release_lock()
			raise Empty
		if not locked:
			locked = self.esema.acquire_lock(0)
			if not locked:
				# Somebody else has esema
				# but we have mutex --
				# go out of their way
				self.mutex.release_lock()
				raise Empty
		was_full = self._full()
		item = self._get()
		if was_full:
			self.fsema.release_lock()
		if not self._empty():
			self.esema.release_lock()
		self.mutex.release_lock()
		return item

	# XXX Need to define put_nowait() as well.
		

	# Override these methods to implement other queue organizations
	# (e.g. stack or priority queue).
	# These will only be called with appropriate locks held

	# Initialize the queue representation
	def _init(self, maxsize):
		self.maxsize = maxsize
		self.queue = []

	def _qsize(self):
		return len(self.queue)

	# Check wheter the queue is empty
	def _empty(self):
		return not self.queue

	# Check whether the queue is full
	def _full(self):
		return self.maxsize > 0 and len(self.queue) == self.maxsize

	# Put a new item in the queue
	def _put(self, item):
		self.queue.append(item)

	# Get an item from the queue
	def _get(self):
		item = self.queue[0]
		del self.queue[0]
		return item