Python with multiprocessing and timeout

Nowadays, we have a lot of cores in the CPU. But we cannot make any good use of it unless we program in multicore.

Say we have thousands of SEC filings to process every year; we write a function to collect data from it. For most filings, it works just fine. But for a small group of filings (8 out of 8000+ in 2005, for my data set, as an example), it runs with no end. 10 hours later, the process for this filing is still stuck somewhere. As Obama once famously (and maybe unfairly) said, “Eight is enough.” (It was unfair, according to McCain, because Obama made Bush a pet name for him.) We can put a timeout exception to end it.

Let’s first make a troublesome function, one that sometimes runs overtime.

from time import sleep

def f(x):
    """ x can be any integer. 
        But when x is a even number, it takes x seconds to finish; 
        when x is an odd number, it takes one second. 
        When x=2, it causes an error/exception.
    """
    sl = x if (-1)**x >0 else 1
    print("Start running with x={} and sleep={}".format(x,sl))
    sleep(sl)
    try:
        print("  finished with x={} and sleep={}, result={}".format(x, sl, 1 / (x-2)))
        return x
    except Exception as e:
        print('\n\nCaught exception {} in worker thread (x = {:d}):'.format(e, x))
        return None

And, let’s make use of a 2-core CPU by using Python’s multiprocessing. Run f(x) parallelly, and collect results as a list. And every child-process is allowed to run for at most 5 seconds. In the end, we collect the result and do some analysis (that’s not the job of this post, though).

import multiprocessing
if __name__ == '__main__':
    with multiprocessing.Pool(2) as pool:
        async_results = [pool.apply_async(f, (i,)) for i in range(20)]
        results_collection=[]
        for async_res in async_results:
            try:
                this_res = async_res.get(timeout=5)
                results_collection.append(this_res)
            except Exception as e:
                print("Exception: {}".format(e))
                results_collection.append(None)
                pass
        print(results_collection)
        #Removing unsuccessful ones
        results_collection = [r for r in results_collection if r !=None]
        print(results_collection)

The outputs:

[0, 1, None, 3, 4, 5, 6, 7, None, 9, None, 11, None, 13, None, 15, None, 17, None, 19]
[0, 1, 3, 4, 5, 6, 7, 9, 11, 13, 15, 17, 19]

The first line of output has 20 elements. The return value of f(x) for x in range(20). Some are None (x=2, 8, 10, 12, 14, 16, 18) because it either had an error or was timed out.

And the final result is ready for analysis next step: [0, 1, 3, 4, 5, 6, 7, 9, 11, 13, 15, 17, 19]

How it runs live:

The code in GitHub:

https://github.com/reeyarn/trytimeout/blob/main/trytimeout.py

The code looks too simple. Why would I bother writing a post in my blog? Well, to me, yesterday it was not this simple. I tried a few other ways. They didn’t work. The worst case was, my workstation was even frozen, and I cannot run any command even in bash. There was an error message from the OS: “-bash: fork: retry: Resource temporarily unavailable.” And I googled a way out… “exec killall python3”