I'm trying to implement a queue. This is old code which was either taken from some kind of tutorial that I did some time ago or from some kind of experimentation that I did reading the docs, or a mix of the two. Thing is I'm not sure if the code is mine or not, but I'm trying to use it as an example to learn from. The script has a producer that produces numbers in a list and 2 consumers competing for grabbing those numbers and adding them up, the one with the highest sum wins.
So, here's my question: in the following code in the "consume_numbers" function I have a time.sleep(0.01) line which makes the code run. Without it, the code hangs, with it it runs smoothly. Can someone explain why this happens and how I could implement a queue without this issue?
import concurrent.futures
import time
import random
import threading
import queue
class MyQueue(queue.Queue):
def __init__(self, maxsize=10):
super().__init__()
self.maxsize = maxsize
self.numbers = []
def set_number(self, number):
self.put(number)
self.numbers.append(number)
def get_number(self):
return self.get()
def produce_random_numbers(q: MyQueue, maxcount: int, evnt: threading.Event):
count = 0
while not evnt.is_set():
num = random.randint(1, 5)
q.set_number(num)
count = 1
if count > maxcount:
event.set()
def consume_numbers(q: MyQueue, consumed: list, evnt: threading.Event):
while not q.empty() or not evnt.is_set():
num = q.get_number()
time.sleep(0.01)
consumed.append(num)
if __name__ == "__main__":
q = MyQueue(maxsize=10)
event = threading.Event()
cons1 = []
cons2 = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
ex.submit(produce_random_numbers, q, 50, event)
ex.submit(consume_numbers, q, cons1, event)
ex.submit(consume_numbers, q, cons2, event)
event.set()
print(f'Generated Numbers: {q.numbers}')
print(f'Numbers Consumed by Thread1 which summed up to {sum(cons1)} are: {cons1}')
print(f'Numbers Consumed by Thread2 which summed up to {sum(cons2)} are: {cons2}')
if sum(cons1) > sum(cons2):
print("Thread1 Wins!")
elif sum(cons1) < sum(cons2):
print("Thread2 Wins!")
else:
print("It's a tie!")
Thanks!
CodePudding user response:
The code does not implement a queue from scratch, but extends queue.Queue to add memory. There is an event object that is used to signal to the consumers that the producer thread has finished. There is are hidden race conditions in the consumers when there is only one item on the queue.
The check not q.empty() or not evnt.is_set() will run the loop code either if there is something in the queue or the event has not been set. It could happen that:
- One thread sees that the queue is not empty and enters the loop
- A thread switch happens, and the other thread consumes the last item
- A switch happens to the first thread, which calls
get_number()and blocks
A similar race condition happens with the evnt.is_set() check:
- The last item is added to the queue by the producer, and a thread switch happens
- One thread consumes the last item, a switch
- A thread switch happens, the consumer gets the last item and goes back to the loop condition. As the event has not been set the loop is executed and
get_number()blocks
Having the threads wait minimizes the chance of these conditions happening. Without waiting, it is very likely that a single consumer thread will consume all the queue items, while the other one is still entering its loop.
Using timeouts is cumbersome. A useful idiom that avoids using events is to use iter and use an impossible value as a sentinel:
# --- snip ---
def produce_random_numbers(q: MyQueue, maxcount: int, n_consumers: int):
for _ in range(maxcount):
num = random.randint(1, 5)
q.set_number(num)
for _ in range(n_consumers):
q.put(None) # <--- I use put to put one sentinel per consumer
def consume_numbers(q: MyQueue, consumed: list):
for num in iter(q.get_number, None):
consumed.append(num)
if __name__ == "__main__":
q = MyQueue(maxsize=10)
cons1 = []
cons2 = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
ex.submit(produce_random_numbers, q, 500000, 2)
ex.submit(consume_numbers, q, cons1)
ex.submit(consume_numbers, q, cons2)
print(f'Generated Numbers: {q.numbers}')
# --- snip ---
There are some other issues and things I would have done differently:
- The
event.set()after thewith...block is useless: the event has already been set by the producer - There is a typo in the producer and the global
eventvariable is used instead of the localevntparameter. Fortunately those refer to the same object. - As there is only one producer, there will be no problem. Otherwise the order of MyQueue.numbers could be different from the order in which the items were added to the queue:
putis called on one thread- a thread switch happens
- a
putappendhappens in the new thread - a thread switch happens, and the first value is
appended
- Instead of defining
MyQueue.set_numberI would have overridedput
