I'd like to know how I can properly implement threading into my application and hold at bay any race conditions while not negatively impacting the scripts performance times.
I have omitted and simplified the code for easy reading, but would be similar in structure to what is seen below
run.py
from lib.DropSender import DropSender
drop_sender = DropSender( options )
drop_sender.start()
DropSender.py
from lib.Connect import Connection
import threading
import json
class DropSender:
def __init__( self , options = {} ):
self.system_online = True
# This is a Web Socket connection delivering messages
def on_message(self, message):
js = json.loads( message )
symbol = js[6]
connections = Connection( self, symbol )
connections.start()
Connect.py
import threading
import requests
import mysql.connector
from threading import Thread, Lock
class Connection( threading.Thread ):
def __init__( self , drop_sender, symbol ):
threading.Thread.__init__( self )
self.symbol = symbol
self.lock = Lock()
def run( self ):
self.users_list = self.getUsers( self.symbol )
if self.users_list["count"] > 0:
for u in self.users_list["data"]:
self.user_id = u["user_id"] # Example 1122
self.amount = u["amount"] # 923.40
t = Thread(target=self.makePurchase, args=(self.symbol, self.user_id, self.amount, ))
t.start()
# t.join()
I know that join() removes race conditions, but that again removes the performance speed of the script, with the threads waiting on each other
def getUsers():
# MYSQL Call to get list of users who like this 'symbol'
my_users_arr = { "data" : data, "count" : count }
return my_users_arr
def makePurchase( self, symbol, user_id, amount ):
# Lock it up
self.lock.acquire()
-All sorts of race conditions happening here, even with the locks acquired-
# User ID = 1122
# Amount 884.00 (1122's User ID mixing up with another users amount, race condition)
# Release Lock
self.lock.release()
CodePudding user response:
Your use of connection.user_id and connection.amount looks suspicious. There is only a single connection object, and these two fields are used and then immediately overwritten by setting up the next thread.
You do not show your code for makePurchase(), but it should not expect these two fields of self to be correct.
As an aside, I highly recommend the use of a thread pool.
from multiprocessing.pool import ThreadPool
with ThreadPool() as pool:
for ....:
pool.apply_async(func, args)
This limits the number of threads to the number of CPUs on your machine, and makes sure that all the threads are cleaned up when done.
