Python – Should I use events, semaphores, locks, conditions, or a combination thereof to manage the safe exit of my multithreaded Python program?

Should I use events, semaphores, locks, conditions, or a combination thereof to manage the safe exit of my multithreaded Python program?… here is a solution to the problem.

Should I use events, semaphores, locks, conditions, or a combination thereof to manage the safe exit of my multithreaded Python program?

I’m writing a multithreaded python program where the main thread and the other threads it spawns run as daemons (but not Thread.daemon=True) that look for specific files in specific directories and perform operations on them when they exist. An error may occur in one/any thread that requires the entire program to exit. However, I need other threads to finish their current work before exiting.

As I understand it, if I set myThread.daemon=True for the generated threads, they will automatically exit as soon as the main thread exits. However, I expect other threads to finish their current work before exiting (unless the error is some kind of catastrophic failure, in which case I would probably quit everything anyway, safe or not). So I didn’t set the thread’s daemon property to True.

Looking at the threading module documentation and the various objects available to me, such as events, semaphores, conditions, and locks, I’m not sure the best way to handle my case. Also, I’m not sure how to handle this situation when the program needs to be terminated due to SIGTERM/SIGINT signals.

Some code illustrates a simplified version of my program structure:

import threading
import signals
import glob
import time

class MyThread1( threading.thread ):
    def __init__( self, name='MyThread1' ):
        threading. Thread.__init__( self )
        self.name = name
        return
    def run( self ):
        while True:
            filePathList = glob.glob( thisThreadDir + '/*.txt' )
            for file in filePathList:
                try:
                    doSomeProcessing( file )
                    # Then move the file to another thread's dir
                    # or potentially create a new file that will 
                    # be picked up by another thread
                except:
                    # Need to potentially tell all other threads
                    # to finish task and exit depending on error

# I assume this would be the place to check for some kind of
            # flag or other indication to terminate the thread?
            time.sleep( 30 )

# Now imagine a few more custom threads with the same basic structure, 
# except that what is happening in doSomeProcessing() will obviously vary

# Main Thread/Script
def sigintHandler( SIGINT, frame ):
    # How do I handle telling all threads to finish their current loop
    # and then exit safely when I encounter this signal?
    sys.exit( 1 )

def sigtermHandler( SIGTERM, frame ):
    # Same question for this signal handler
    sys.exit( 1 )

signal.signal( signal. SIGINT, sigintHandler )
signal.signal( signal. SIGTERM, sigtermHandler )

myOtherThread1 = MyThread1()
myOtherThreadN = MyThreadN()

myOtherThread1.start()
myOtherThreadN.start()

while True:
    filePathList = glob.glob( mainDir + '/*.txt' )
    for file in filePathList:
        try:
            doMainProcessing( file )
            # Move file or write a new one in another thread's dir
        except:
            # Again, potentially need to exit the whole program, but want 
            # the other threads to finish their current loop first 

# Check if another thread told us we need to exit?
    time.sleep( 30 )

Solution

I’ll use events to signal to the thread that it should quit :

  • Create an event in the __init__
  • Use the wait() event in run() to sleep and check when to exit
  • Set the event externally to stop the thread

To handle exceptions in a thread, I would surround everything it does with a try/except block. When something is caught, store the exception (and/or any other information you need), clean up and exit the thread.

Externally, in the main thread, check all threads for stored exceptions and notify all threads to exit if any exceptions are found.

To handle exceptions in the main thread (including SIGINT), there is a try/except blocking there and signaling a stop to all threads.

All of this, plus dummy exceptions and debug print:

import threading
import time

class MyThread(threading. Thread):
    def __init__(self):
        super().__init__()
        self.stop_requested = threading. Event()
        self.exception = None

def run(self):
        try:
            # sleep for 1 second, or until stop is requested, stop if stop is requested
            while not self.stop_requested.wait(1):
                # do your thread thing here
                print('in thread {}'.format(self))

# simulate a random exception:
                import random
                if random.randint(0, 50) == 42:
                    1 / 0
        except Exception as e:
            self.exception = e

# clean up here
        print('clean up thread {}'.format(self))

def stop(self):
        # set the event to signal stop
        self.stop_requested.set()

# create and start some threads
threads = [MyThread(), MyThread(), MyThread(), MyThread()]
for t in threads:
    t.start()

# main thread looks at the status of all threads
try:
    while True:
        for t in threads:
            if t.exception:
                # there was an error in a thread - raise it in main thread too
                # this will stop the loop
                raise t.exception
        time.sleep(0.2)

except Exception as e:
    # handle exceptions any way you like, or don't
    # This includes exceptions in main thread as well as those in other threads
    # (because of "raise t.exception" above)
    print(e)

finally:
    print('clan up everything')
    for t in threads:
        # threads will know how to clean up when stopped
        t.stop()

Related Problems and Solutions