When one worker thread fails, how to abort remaining workers?

I have a program which spawns multiple threads, each of which executes a long-running task. The main thread then waits for all worker threads to join, collects results, and exits.

If an error occurs in one of the workers, I want the remaining workers to stop gracefully, so that the main thread can exit shortly afterwards.

My question is how best to do this, when the implementation of the long-running task is provided by a library whose code I cannot modify.

Here is a simple sketch of the system, with no error handling:

void threadFunc()
{
    // Do long-running stuff
}

void mainFunc()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 3; ++i) {
        threads.push_back(std::thread(&threadFunc));
    }

    for (auto &t : threads) {
        t.join();
    }
}

If the long-running function executes a loop and I have access to the code, then execution can be aborted simply by checking a shared "keep on running" flag at the top of each iteration.

std::mutex mutex;
bool error;

void threadFunc()
{
    try {
        for (...) {
            {
                std::unique_lock<std::mutex> lock(mutex);
                if (error) {
                    break;
                }
            }
        }
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

Now consider the case when the long-running operation is provided by a library:

std::mutex mutex;
bool error;

class Task
{
public:
    // Blocks until completion, error, or stop() is called
    void run();

    void stop();
};

void threadFunc(Task &task)
{
    try {
        task.run();
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

In this case, the main thread has to handle the error, and call stop() on the still-running tasks. As such, it cannot simply wait for each worker to join() as in the original implementation.

The approach I have used so far is to share the following structure between the main thread and each worker:

struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;
}

When a worker completes successfully, it decrements the running count. If an exception is caught, the worker sets the error flag. In both cases, it then calls condVar.notify_one() .

The main thread then waits on the condition variable, waking up if either error is set or running reaches zero. On waking up, the main thread calls stop() on all tasks if error has been set.

This approach works, but I feel there should be a cleaner solution using some of the higher-level primitives in the standard concurrency library. Can anyone suggest an improved implementation?

Here is the complete code for my current solution:

// main.cpp

#include <chrono>
#include <mutex>
#include <thread>
#include <vector>

#include "utils.h"

// Class which encapsulates long-running task, and provides a mechanism for aborting it
class Task
{
public:
    Task(int tidx, bool fail)
    :   tidx(tidx)
    ,   fail(fail)
    ,   m_run(true)
    {

    }

    void run()
    {
        static const int NUM_ITERATIONS = 10;

        for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
            {
                std::unique_lock<std::mutex> lock(m_mutex);
                if (!m_run) {
                    out() << "thread " << tidx << " aborting";
                    break;
                }
            }

            out() << "thread " << tidx << " iter " << iter;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));

            if (fail) {
                throw std::exception();
            }
        }
    }

    void stop()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_run = false;
    }

    const int tidx;
    const bool fail;

private:
    std::mutex m_mutex;
    bool m_run;
};

// Data shared between all threads
struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;

    SharedData(int count)
    :   error(false)
    ,   running(count)
    {

    }
};

void threadFunc(Task &task, SharedData &shared)
{
    try {
        out() << "thread " << task.tidx << " starting";

        task.run(); // Blocks until task completes or is aborted by main thread

        out() << "thread " << task.tidx << " ended";
    } catch (std::exception &) {
        out() << "thread " << task.tidx << " failed";

        std::unique_lock<std::mutex> lock(shared.mutex);
        shared.error = true;
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);
        --shared.running;
    }

    shared.condVar.notify_one();
}

int main(int argc, char **argv)
{
    static const int NUM_THREADS = 3;

    std::vector<std::unique_ptr<Task>> tasks(NUM_THREADS);
    std::vector<std::thread> threads(NUM_THREADS);

    SharedData shared(NUM_THREADS);

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        const bool fail = (tidx == 1);
        tasks[tidx] = std::make_unique<Task>(tidx, fail);
        threads[tidx] = std::thread(&threadFunc, std::ref(*tasks[tidx]), std::ref(shared));
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);

        // Wake up when either all tasks have completed, or any one has failed
        shared.condVar.wait(lock, [&shared](){
            return shared.error || !shared.running;
        });

        if (shared.error) {
            out() << "error occurred - terminating remaining tasks";
            for (auto &t : tasks) {
                t->stop();
            }
        }
    }

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        out() << "waiting for thread " << tidx << " to join";
        threads[tidx].join();
        out() << "thread " << tidx << " joined";
    }

    out() << "program complete";

    return 0;
}

Some utility functions are defined here:

// utils.h

#include <iostream>
#include <mutex>
#include <thread>

#ifndef UTILS_H
#define UTILS_H

#if __cplusplus <= 201103L
// Backport std::make_unique from C++14
#include <memory>
namespace std {

template<typename T, typename ...Args>
std::unique_ptr<T> make_unique(
            Args&& ...args)
{
    return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

} // namespace std
#endif // __cplusplus <= 201103L

// Thread-safe wrapper around std::cout
class ThreadSafeStdOut
{
public:
    ThreadSafeStdOut()
    :   m_lock(m_mutex)
    {

    }

    ~ThreadSafeStdOut()
    {
        std::cout << std::endl;
    }

    template <typename T>
    ThreadSafeStdOut &operator<<(const T &obj)
    {
        std::cout << obj;
        return *this;
    }

private:
    static std::mutex m_mutex;
    std::unique_lock<std::mutex> m_lock;
};

std::mutex ThreadSafeStdOut::m_mutex;

// Convenience function for performing thread-safe output
ThreadSafeStdOut out()
{
    return ThreadSafeStdOut();
}

#endif // UTILS_H

I've been thinking about your situation for sometime and this maybe of some help to you. You could probably try doing a couple of different methods to achieve you goal. There are 2-3 options that maybe of use or a combination of all three. I will at minimum show the first option for I'm still learning and trying to master the concepts of Template Specializations as well as using Lambdas.

  • Using a Manager Class
  • Using Template Specialization Encapsulation
  • Using Lambdas.
  • Pseudo code of a Manager Class would look something like this:

    class ThreadManager {
    private:
        std::unique_ptr<MainThread> mainThread_;
        std::list<std::shared_ptr<WorkerThread> lWorkers_;  // List to hold finished workers
        std::queue<std::shared_ptr<WorkerThread> qWorkers_; // Queue to hold inactive and waiting threads.
        std::map<unsigned, std::shared_ptr<WorkerThread> mThreadIds_; // Map to associate a WorkerThread with an ID value.
        std::map<unsigned, bool> mFinishedThreads_; // A map to keep track of finished and unfinished threads.
    
        bool threadError_; // Not needed if using exception handling
    public:
        explicit ThreadManager( const MainThread& main_thread );
    
        void shutdownThread( const unsigned& threadId );
        void shutdownAllThreads();
    
        void addWorker( const WorkerThread& worker_thread );          
        bool isThreadDone( const unsigned& threadId );
    
        void spawnMainThread() const; // Method to start main thread's work.
    
        void spawnWorkerThread( unsigned threadId, bool& error );
    
        bool getThreadError( unsigned& threadID ); // Returns True If Thread Encountered An Error and passes the ID of that thread, 
    
    };
    

    Only for demonstration purposes did I use bool value to determine if a thread failed for simplicity of the structure, and of course this can be substituted to your like if you prefer to use exceptions or invalid unsigned values, etc.

    Now to use a class of this sort would be something like this: Also note that a class of this type would be considered better if it was a Singleton type object since you wouldn't want more than 1 ManagerClass since you are working with shared pointers.

    SomeClass::SomeClass( ... ) {
        // This class could contain a private static smart pointer of this Manager Class
        // Initialize the smart pointer giving it new memory for the Manager Class and by passing it a pointer of the Main Thread object
    
       threadManager_ = new ThreadManager( main_thread ); // Wouldn't actually use raw pointers here unless if you had a need to, but just shown for simplicity       
    }
    
    SomeClass::addThreads( ... ) {
        for ( unsigned u = 1, u <= threadCount; u++ ) {
             threadManager_->addWorker( some_worker_thread );
        }
    }
    
    SomeClass::someFunctionThatSpawnsThreads( ... ) {
        threadManager_->spawnMainThread();
    
        bool error = false;       
        for ( unsigned u = 1; u <= threadCount; u++ ) {
            threadManager_->spawnWorkerThread( u, error );
    
            if ( error ) { // This Thread Failed To Start, Shutdown All Threads
                threadManager->shutdownAllThreads();
            }
        }
    
        // If all threads spawn successfully we can do a while loop here to listen if one fails.
        unsigned threadId;
        while ( threadManager_->getThreadError( threadId ) ) {
             // If the function passed to this while loop returns true and we end up here, it will pass the id value of the failed thread.
             // We can now go through a for loop and stop all active threads.
             for ( unsigned u = threadID + 1; u <= threadCount; u++ ) {
                 threadManager_->shutdownThread( u );
             }
    
             // We have successfully shutdown all threads
             break;
        }
    }
    

    I like the design of manager class since I have used them in other projects, and they come in handy quite often especially when working with a code base that contains many and multiple resources such as a working Game Engine that has many assets such as Sprites, Textures, Audio Files, Maps, Game Items etc. Using a Manager Class helps to keep track and maintain all of the assets. This same concept can be applied to "Managing" Active, Inactive, Waiting Threads, and knows how to intuitively handle and shutdown all threads properly. I would recommend using an ExceptionHandler if your code base and libraries support exceptions as well as thread safe exception handling instead of passing and using bools for errors. Also having a Logger class is good to where it can write to a log file and or a console window to give an explicit message of what function the exception was thrown in and what caused the exception where a log message might look like this:

    Exception Thrown: someFunctionNamedThis in ThisFile on Line# (x)
        threadID 021342 failed to execute.
    

    This way you can look at the log file and find out very quickly what thread is causing the exception, instead of using passed around bool variables.


    The implementation of the long-running task is provided by a library whose code I cannot modify.

    That means you have no way to synchronize the job done by working threads

    If an error occurs in one of the workers,

    Let's suppose that you can really detect worker errors; some of then can be easily detected if reported by the used library others cannot ie

  • the library code loops.
  • the library code prematurely exit with an uncaught exception.
  • I want the remaining workers to stop **gracefully**

    That's just not possible

    The best you can do is writing a thread manager checking on worker thread status and if an error condition is detected it just (ungracefully) "kills" all the worker threads and exits.

    You should also consider detecting a looped working thread (by timeout) and offer to the user the option to kill or continue waiting for the process to finish.


    Your problem is that the long running function is not your code, and you say you cannot modify it. Consequently you cannot make it pay any attention whatsoever to any kind of external synchronisation primitive (condition variables, semaphores, mutexes, pipes, etc), unless the library developer has done that for you.

    Therefore your only option is to do something that wrestles control away from any code no matter what it's doing. This is what signals do. For that, you're going to have to use pthread_kill(), or whatever the equivalent is these days.

    The pattern would be that

  • The thread that detects an error needs to communicate that error back to the main thread in some manner.
  • The main thread then needs to call pthread_kill() for all the other remaining threads. Don't be confused by the name - pthread_kill() is simply a way of delivering an arbitrary signal to a thread. Note that signals like STOP, CONTINUE and TERMINATE are process-wide even if raised with pthread_kill(), not thread specific so don't use those.
  • In each of those threads you'll need a signal handler. On delivery of the signal to a thread the execution path in that thread will jump to the handler no matter what the long running function was doing.
  • You are now back in (limited) control, and can (probably, well, maybe) do some limited cleanup and terminate the thread.
  • In the meantime the main thread will have been calling pthread_join() on all the threads it's signaled, and those will now return.
  • My thoughts:

  • This is a really ugly way of doing it (and signals / pthreads are notoriously difficult to get right and I'm no expert), but I don't really see what other choice you have.
  • It'll be a long way from looking 'graceful' in source code, though the end user experience will be OK.
  • You will be aborting execution part way through running that library function, so if there's any clean up it would normally do (eg freeing up memory it has allocated) that won't get done and you'll have a memory leak. Running under something like valgrind is a way of working out if this is happening.
  • The only way of getting the library function to clean up (if it needs it) will be for your signal handler to return control to the function and letting it run to completion, just what you don't want to do.
  • And of course, this won't work on Windows (no pthreads, at least none worth speaking of, though there may be an equivalent mechanism).
  • Really the best way is going to be to re-implement (if at all possible) that library function.

    链接地址: http://www.djcxy.com/p/92048.html

    上一篇: Java的Fork / Join与ExecutorService

    下一篇: 当一个工作线程失败时,如何中止剩余的工人?