I have a job which calls start_measure function that runs a while loop to read a counter & store its value in a list as a separate process
Then I run a workload for some time in parallel
How can I run a python function start_measure() independently and stop the while loop using function call stop_measure()?
def run_job():
start_measure()
run_workload()
stop_measure()
def start_measure():
while 1:
# Read the counter& store in list
def stop_measure():
# stop while loop & store the list in file
def run_workload():
# do something for random time duration
CodePudding user response:
As suggested by Tim Roberts, you can implement a producer/consumer pattern, using a queue.
Here an example of an implementation based on the ThreadPoolExecutor, from the concurrent.futures library:
import concurrent.futures
import queue
import random
import threading
import time
import logging
def get_measurements(queue_in, event):
"""Pretend retrieving measurements from somewhere and put them in a queue."""
logger = logging.getLogger('get_measurements')
i = -1
while not event.is_set():
i = 1
message = (i, random.randint(1, 101))
logger.info('Received measurement: %s', message)
queue_in.put(message)
# Add some sleep time at the end of each batch of 5 measurements to avoid a deadlock
if (i % 5) == 4:
time.sleep(0.1)
logger.info('End of acquisition')
def process_measurements(queue_in, queue_out, event, max_seconds):
"""process the measurements and put the result in another queue"""
logger = logging.getLogger('process_measurements')
i = 0
start_time = time.time()
while not event.is_set() and (time.time() - start_time) < max_seconds:
# wait for the queue to have something
if queue_in.empty():
continue
# Get an element from the measurement queue and process it
measurement = queue_in.get()
logger.info('Process measurement: %s', measurement)
processed_item = (measurement[0], measurement[0] measurement[1] * 2)
logger.info('Result: %s', processed_item)
# Add the result to the queue of the processed items
queue_out.put(processed_item)
# Add some sleep time to avoid a deadlock
if (i % 5) == 4:
time.sleep(0.1)
i = 1
logger.info('End processing')
def stop_acquisition_and_process(queue_out, event, filepath, max_seconds, max_items):
"""Stop the acquisition and processing loops & store the list in a file"""
logger = logging.getLogger('stop_acquisition_and_process')
start_time = time.time()
output_list = []
while len(output_list) < max_items and (time.time() - start_time) < max_seconds:
# wait for the queue to have something
if queue_out.empty():
continue
# Get a processed item from the queue
processed_item = queue_out.get()
logger.info('Processed item to save: %s', processed_item)
# Append the result to the list of the processed items to be saved
output_list.append(processed_item)
# Add some sleep time to avoid a deadlock
if (len(output_list) % 5) == 0:
time.sleep(0.1)
# Send the termination event
logger.info('Setting the termination event...')
event.set()
# Save the file with the processed items (one per row)
with open(filepath, 'w') as f:
f.writelines(['\t'.join(list(map(str, a))) '\n' for a in output_list])
return output_list
def run_job():
"""Main program"""
logging.basicConfig(format='%(name)s - %(asctime)s: %(message)s', level=logging.INFO)
logger = logging.getLogger('run_job')
# filepath of the file with the list
fp = 'saved_list.txt'
# number of processed items to be saved in the file. Only the first num_its will be saved, all the others
# will be discarded
num_its = 10
# maximum time to collect max_its items; when this time expires, the collected items so far will be saved
max_secs = 5
# Queue for the measurements
measure_queue = queue.Queue(maxsize=num_its)
# Queue for the results of the processing
result_queue = queue.Queue(maxsize=num_its)
# The following event is used to stop the acquisition and the processing when the required number of items has been
# collected or the maximum allowed time has expired
ev = threading.Event()
# Use a ThreadPoolExecutor to manage the threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# create all the threads here
executor.submit(get_measurements, measure_queue, ev)
executor.submit(process_measurements, measure_queue, result_queue, ev, max_secs)
f3 = executor.submit(stop_acquisition_and_process, result_queue, ev, fp, max_secs, num_its)
# Get the list of processed items also as a result
res = f3.result()
# print the number of elements in the list
logger.info("The processed data has been saved in %s and it contains %s rows", fp, len(res))
The function get_measurements pretends to collect some measurements, while the function process_measurements processes them. The two functions share a queue called measure_queue: the get_measurements produces the values for this queue and feeds it, while the process_measurements consumes values from this queue and writes the results on a second queue, called result_queue. Both functions operate on the queue until an event is verified.
A third function, stop_acquisition_and_process handles the saving of the processed measurements, saving them on a file, and stops the threads of the acquisition and of the processing, setting an event. Values to be written are read from the result_queue and appended to the result list. As soon as enough processed items have been collected or the allowed time for the computation has expired, the function set the termination event and save the processed items on a file.
The output of the run_job will be like this:
get_measurements - 2022-01-22 01:14:56,668: Received measurement: (0, 70)
get_measurements - 2022-01-22 01:14:56,668: Received measurement: (1, 37)
get_measurements - 2022-01-22 01:14:56,668: Received measurement: (2, 19)
get_measurements - 2022-01-22 01:14:56,668: Received measurement: (3, 34)
get_measurements - 2022-01-22 01:14:56,668: Received measurement: (4, 23)
process_measurements - 2022-01-22 01:14:56,668: Process measurement: (0, 70)
process_measurements - 2022-01-22 01:14:56,668: Result: (0, 140)
process_measurements - 2022-01-22 01:14:56,668: Process measurement: (1, 37)
process_measurements - 2022-01-22 01:14:56,668: Result: (1, 75)
process_measurements - 2022-01-22 01:14:56,669: Process measurement: (2, 19)
process_measurements - 2022-01-22 01:14:56,669: Result: (2, 40)
process_measurements - 2022-01-22 01:14:56,669: Process measurement: (3, 34)
process_measurements - 2022-01-22 01:14:56,669: Result: (3, 71)
process_measurements - 2022-01-22 01:14:56,669: Process measurement: (4, 23)
process_measurements - 2022-01-22 01:14:56,669: Result: (4, 50)
stop_acquisition_and_process - 2022-01-22 01:14:56,669: Processed item to save: (0, 140)
stop_acquisition_and_process - 2022-01-22 01:14:56,669: Processed item to save: (1, 75)
stop_acquisition_and_process - 2022-01-22 01:14:56,669: Processed item to save: (2, 40)
stop_acquisition_and_process - 2022-01-22 01:14:56,669: Processed item to save: (3, 71)
stop_acquisition_and_process - 2022-01-22 01:14:56,669: Processed item to save: (4, 50)
get_measurements - 2022-01-22 01:14:56,772: Received measurement: (5, 18)
get_measurements - 2022-01-22 01:14:56,773: Received measurement: (6, 20)
get_measurements - 2022-01-22 01:14:56,773: Received measurement: (7, 85)
get_measurements - 2022-01-22 01:14:56,773: Received measurement: (8, 12)
get_measurements - 2022-01-22 01:14:56,773: Received measurement: (9, 76)
process_measurements - 2022-01-22 01:14:56,774: Process measurement: (5, 18)
process_measurements - 2022-01-22 01:14:56,774: Result: (5, 41)
process_measurements - 2022-01-22 01:14:56,775: Process measurement: (6, 20)
process_measurements - 2022-01-22 01:14:56,775: Result: (6, 46)
process_measurements - 2022-01-22 01:14:56,775: Process measurement: (7, 85)
process_measurements - 2022-01-22 01:14:56,775: Result: (7, 177)
process_measurements - 2022-01-22 01:14:56,775: Process measurement: (8, 12)
process_measurements - 2022-01-22 01:14:56,775: Result: (8, 32)
process_measurements - 2022-01-22 01:14:56,775: Process measurement: (9, 76)
process_measurements - 2022-01-22 01:14:56,775: Result: (9, 161)
stop_acquisition_and_process - 2022-01-22 01:14:56,775: Processed item to save: (5, 41)
stop_acquisition_and_process - 2022-01-22 01:14:56,775: Processed item to save: (6, 46)
stop_acquisition_and_process - 2022-01-22 01:14:56,776: Processed item to save: (7, 177)
stop_acquisition_and_process - 2022-01-22 01:14:56,776: Processed item to save: (8, 32)
stop_acquisition_and_process - 2022-01-22 01:14:56,776: Processed item to save: (9, 161)
get_measurements - 2022-01-22 01:14:56,875: Received measurement: (10, 2)
get_measurements - 2022-01-22 01:14:56,875: Received measurement: (11, 51)
get_measurements - 2022-01-22 01:14:56,875: Received measurement: (12, 23)
get_measurements - 2022-01-22 01:14:56,876: Received measurement: (13, 87)
get_measurements - 2022-01-22 01:14:56,876: Received measurement: (14, 14)
process_measurements - 2022-01-22 01:14:56,879: Process measurement: (10, 2)
process_measurements - 2022-01-22 01:14:56,879: Result: (10, 14)
process_measurements - 2022-01-22 01:14:56,879: Process measurement: (11, 51)
process_measurements - 2022-01-22 01:14:56,879: Result: (11, 113)
process_measurements - 2022-01-22 01:14:56,879: Process measurement: (12, 23)
process_measurements - 2022-01-22 01:14:56,879: Result: (12, 58)
process_measurements - 2022-01-22 01:14:56,880: Process measurement: (13, 87)
process_measurements - 2022-01-22 01:14:56,880: Result: (13, 187)
process_measurements - 2022-01-22 01:14:56,880: Process measurement: (14, 14)
process_measurements - 2022-01-22 01:14:56,880: Result: (14, 42)
stop_acquisition_and_process - 2022-01-22 01:14:56,880: Setting the termination event...
get_measurements - 2022-01-22 01:14:56,978: End of acquisition
process_measurements - 2022-01-22 01:14:56,983: End processing
run_job - 2022-01-22 01:14:56,984: The processed data has been saved in saved_list.txt and it contains 10 rows
The output can help you in following the interactions between the different threads and the data flow. As you can see, the order of arrival of the measurement is preserved and no measurement is lost (and this is another pros of using a queue to communicate between different threads).
You can find a good introduction to the producer/consumer implementation in Python using threads here (the example in this answer is based on that page).
CodePudding user response:
it may be informative, please look at
import asyncio
import os
import random
start_measure_flag = True
async def randsleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
async def makeitem(size: int = 5) -> str:
return os.urandom(size).hex()
async def start_measure():
global start_measure_flag
while start_measure_flag:
await randsleep(caller=f"start_measure")
i = await makeitem()
print(f"created {i}")
async def stop_measure():
global start_measure_flag
for n in range(3):
await randsleep(caller=f"stop_measure")
print(f"stop {n}")
start_measure_flag = False
async def run_workload():
global start_measure_flag
while start_measure_flag:
await randsleep(caller=f"run_workload")
print(f"run_workload-done")
async def main():
await asyncio.gather(start_measure(),run_workload(),stop_measure())
if __name__ == "__main__":
asyncio.run(main())
CodePudding user response:
Yes, you can use threads to do that:
from threading import Thread;
import time
start_measure_flag = True
#Your pretty function declared here
def start_measure():
global start_measure_flag
while start_measure_flag:
print("Hello, world!")
# Read the counter& store in list
#The function that stop the function up here
def stop_measure():
global start_measure_flag
start_measure_flag = False
# stop while loop & store the list in file
# A thread to start the "start_measure" in parallel
thr1 = Thread(target=start_measure)
# Start the thread
thr1.start()
# Sleep the main thread for 3 seconds
time.sleep(3)
# Call your function that stop the "start_measure"
stop_measure()
# Done :)
print("Done!")
It's simple in python, and give you the paralellism.
