Should I use events, semaphores, locks, conditions, or a combination thereof to manage the safe exit of my multithreaded Python program?
I’m writing a multithreaded python program where the main thread and the other threads it spawns run as daemons (but not Thread.daemon=True) that look for specific files in specific directories and perform operations on them when they exist. An error may occur in one/any thread that requires the entire program to exit. However, I need other threads to finish their current work before exiting.
As I understand it, if I set myThread.daemon=True for the generated threads, they will automatically exit as soon as the main thread exits. However, I expect other threads to finish their current work before exiting (unless the error is some kind of catastrophic failure, in which case I would probably quit everything anyway, safe or not). So I didn’t set the thread’s daemon property to True.
Looking at the threading module documentation and the various objects available to me, such as events, semaphores, conditions, and locks, I’m not sure the best way to handle my case. Also, I’m not sure how to handle this situation when the program needs to be terminated due to SIGTERM/SIGINT signals.
Some code illustrates a simplified version of my program structure:
import threading
import signals
import glob
import time
class MyThread1( threading.thread ):
def __init__( self, name='MyThread1' ):
threading. Thread.__init__( self )
self.name = name
return
def run( self ):
while True:
filePathList = glob.glob( thisThreadDir + '/*.txt' )
for file in filePathList:
try:
doSomeProcessing( file )
# Then move the file to another thread's dir
# or potentially create a new file that will
# be picked up by another thread
except:
# Need to potentially tell all other threads
# to finish task and exit depending on error
# I assume this would be the place to check for some kind of
# flag or other indication to terminate the thread?
time.sleep( 30 )
# Now imagine a few more custom threads with the same basic structure,
# except that what is happening in doSomeProcessing() will obviously vary
# Main Thread/Script
def sigintHandler( SIGINT, frame ):
# How do I handle telling all threads to finish their current loop
# and then exit safely when I encounter this signal?
sys.exit( 1 )
def sigtermHandler( SIGTERM, frame ):
# Same question for this signal handler
sys.exit( 1 )
signal.signal( signal. SIGINT, sigintHandler )
signal.signal( signal. SIGTERM, sigtermHandler )
myOtherThread1 = MyThread1()
myOtherThreadN = MyThreadN()
myOtherThread1.start()
myOtherThreadN.start()
while True:
filePathList = glob.glob( mainDir + '/*.txt' )
for file in filePathList:
try:
doMainProcessing( file )
# Move file or write a new one in another thread's dir
except:
# Again, potentially need to exit the whole program, but want
# the other threads to finish their current loop first
# Check if another thread told us we need to exit?
time.sleep( 30 )
Solution
I’ll use events
to signal to the thread that it should quit :
- Create an event in
the __init__
- Use the wait() event in
run()
tosleep
and check when to exit - Set the event externally to stop the thread
To handle exceptions in a thread, I would surround everything it does with a try
/except
block. When something is caught, store the exception (and/or any other information you need), clean up and exit the thread.
Externally, in the main thread, check all threads for stored exceptions and notify all threads to exit if any exceptions are found.
To handle exceptions in the main thread (including SIGINT
), there is a try
/except
blocking there and signaling a stop to all threads.
All of this, plus dummy exceptions and debug print:
import threading
import time
class MyThread(threading. Thread):
def __init__(self):
super().__init__()
self.stop_requested = threading. Event()
self.exception = None
def run(self):
try:
# sleep for 1 second, or until stop is requested, stop if stop is requested
while not self.stop_requested.wait(1):
# do your thread thing here
print('in thread {}'.format(self))
# simulate a random exception:
import random
if random.randint(0, 50) == 42:
1 / 0
except Exception as e:
self.exception = e
# clean up here
print('clean up thread {}'.format(self))
def stop(self):
# set the event to signal stop
self.stop_requested.set()
# create and start some threads
threads = [MyThread(), MyThread(), MyThread(), MyThread()]
for t in threads:
t.start()
# main thread looks at the status of all threads
try:
while True:
for t in threads:
if t.exception:
# there was an error in a thread - raise it in main thread too
# this will stop the loop
raise t.exception
time.sleep(0.2)
except Exception as e:
# handle exceptions any way you like, or don't
# This includes exceptions in main thread as well as those in other threads
# (because of "raise t.exception" above)
print(e)
finally:
print('clan up everything')
for t in threads:
# threads will know how to clean up when stopped
t.stop()