Thread Pool


本文原创

实现了一个简单的 POSIX 线程池,并且将线程分别绑定到了 CPU 的各个核心上。

抽象类 ThreadPool.hxx:

#ifndef _ThreadPool_HXX
#define _ThreadPool_HXX
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>

#ifdef __cplusplus
typedef bool TPBOOL;
#define TPTRUE  true
#define TPFALSE false
#else
typedef int TPBOOL;
#define TPTRUE  1
#define TPFALSE 0
#endif

#ifndef TPDEBUG
#define TPDEBUG 0
#endif

#if TPDEBUG
#define DBG_OUT(args...) \
    do { \
        char b__[256]; \
        sprintf(b__, args); \
        fprintf(stdout, "<%s:%d>\t%s", __FUNCTION__, __LINE__, b__); \
    } while(0)
#else
#define DBG_OUT(args...) 
#endif

#define BUSY_THRESHOLD       20    // (busy thread)/(all thread threshold)
#define LIMIT_THRESHOLD      80    // (busy thread)/(all thread threshold)
#define MANAGER_INTERVAL      3    // manager thread sleep interval

#define ThreadPool_Status_Normal     0
#define ThreadPool_Status_Idle       1
#define ThreadPool_Status_Busy       2
#define ThreadPool_Status_ReachLimit 3

typedef void* WorkParameter;
typedef void SocketWorkFunction(void *, int);
typedef void DelayTaskWorkFunction(void *);

///////////////////////////////////////
//////////    Thread Pool    //////////
///////////////////////////////////////
class ThreadPool
{
public:
    virtual ~ThreadPool();

    virtual TPBOOL initialize();
    virtual TPBOOL close();
    virtual void processWork(SocketWorkFunction* func, WorkParameter clientData, int mask);
    virtual void processWork(DelayTaskWorkFunction* func, WorkParameter clientData);

protected:
    ThreadPool(int min_num, int max_num);

    virtual TPBOOL AddWorkerThread(TPBOOL isBusy) = 0;
    virtual TPBOOL DeleteWorkerThread() = 0;
    virtual int GetWorkerThreadIdx(pthread_t id) = 0;
    virtual int GetThreadPoolStatus() = 0;

    static void HandleThreadQuit(int signo);

    static void* OriginalManagerThread(void *clientData);

    int minThreadNum;    // min thread number in the pool
    int curThreadNum;    // current thread number in the pool
    int maxThreadNum;    // max thread number in the pool
    pthread_t threadPool_t;    // manage thread id num
    pthread_mutex_t threadPool_lock;

private:
    ThreadPool(const ThreadPool&);
    ThreadPool& operator=(const ThreadPool&);
};

#endif

ThreadPool.cxx:

#include "ThreadPool.hxx"

using namespace std;

///////////////////////////////////////
//////////    Thread Pool    //////////
///////////////////////////////////////
ThreadPool::ThreadPool(int min_num, int max_num) : minThreadNum(min_num), curThreadNum(min_num), maxThreadNum(max_num)
{
}

ThreadPool::~ThreadPool()
{
}

/**
  * member function reality. thread pool init function.
  * para:
  *     this: thread pool struct instance pointer
  * return:
  *     true: successful; false: failed
  */
TPBOOL ThreadPool::initialize()
{
}

/**
  * member function reality. thread pool entirely close function.
  * para:
  *     this: thread pool struct instance pointer
  * return:
  */
TPBOOL ThreadPool::close()
{
}

void ThreadPool::processWork(SocketWorkFunction* func, WorkParameter clientData, int mask)
{
    (*func)(clientData, mask);
}

void ThreadPool::processWork(DelayTaskWorkFunction* func, WorkParameter clientData)
{
    (*func)(clientData);
}

void ThreadPool::HandleThreadQuit(int signo)
{
    DBG_OUT("[Handle sig %d, thread %ld exit]\n", signo, pthread_self());
    pthread_exit(NULL);
}

/**
  * internal interface. manage thread pool to delete idle thread.
  * para:
  *     pthread: thread pool struct ponter
  * return:
  */
void* ThreadPool::OriginalManagerThread(void *clientData)
{
    ThreadPool *threadpool = (ThreadPool*)clientData; // main thread pool struct instance

    signal(SIGQUIT, HandleThreadQuit);

    do {
        sleep(MANAGER_INTERVAL);
        switch (threadpool->GetThreadPoolStatus())
        {
            case ThreadPool_Status_Idle:
                do {
                    if (!threadpool->DeleteWorkerThread())
                        continue;
                } while(0);
                break;
            case ThreadPool_Status_Busy:
                break;
            case ThreadPool_Status_ReachLimit:
                if (threadpool->curThreadNum < threadpool->maxThreadNum)
                {
                    pthread_mutex_lock(&threadpool->threadPool_lock);
                    TPBOOL res = threadpool->AddWorkerThread(TPFALSE);
                    pthread_mutex_unlock(&threadpool->threadPool_lock);
                }
                break;
            default:
                break;
        }
    } while(TPTRUE);
}

实现类 BasicThreadPool.hxx:

#ifndef _BasicThreadPool_HXX
#define _BasicThreadPool_HXX

#include "ThreadPool.hxx"
#include <sched.h>

///////////////////////////////////////////////
//////////    Basic Worker Thread    //////////
///////////////////////////////////////////////
typedef struct {
    int threadID;
    pthread_t workerThread_t;    // thread id num
    pthread_cond_t workerThread_cond;
    pthread_mutex_t    workerThread_lock;

    TPBOOL isBusy;    // thread status: true-busy; false-idle
    TPBOOL isExit;
    TPBOOL isWait;    // CAUTION:在未调用pthread_cond_wait时通过pthread_cond_signal发送信号会造成信号丢失!

    WorkParameter clientData; // void *
    SocketWorkFunction* socketWorkFunc; // void (*)(void *, int);
    DelayTaskWorkFunction* delayTaskWorkFunc; // void (*)(void *);
    int socketMask;

    cpu_set_t CPUSet;
} BasicWorkerThread;

/////////////////////////////////////////////
//////////    Basic Thread Pool    //////////
/////////////////////////////////////////////
class BasicThreadPool : public ThreadPool
{
public:
    static BasicThreadPool* createNew(int min_num, int max_num);

    virtual ~BasicThreadPool();

    virtual TPBOOL initialize();
    virtual TPBOOL close();
    virtual void processWork(SocketWorkFunction* func, WorkParameter clientData, int mask);
    virtual void processWork(DelayTaskWorkFunction* func, WorkParameter clientData);

protected:
    BasicThreadPool(int min_num, int max_num);

    virtual TPBOOL AddWorkerThread(TPBOOL isBusy);
    virtual TPBOOL DeleteWorkerThread();
    virtual int GetWorkerThreadIdx(pthread_t id);
    virtual int GetThreadPoolStatus();

    static void* WorkerHandleThread(void *clientData);

private:
    BasicWorkerThread *basicWorkerThreadList; // work thread relative thread info
};

#endif

BasicThreadPool.cxx:

#include "BasicThreadPool.hxx"

using namespace std;

namespace conf {
int CPUCount = sysconf(_SC_NPROCESSORS_CONF);
}

//////////////////////////////////////////
//////////    Basic Thread Pool    //////////
//////////////////////////////////////////
BasicThreadPool* BasicThreadPool::createNew(int min_num, int max_num)
{
    return new BasicThreadPool(min_num, max_num);
}

BasicThreadPool::BasicThreadPool(int min_num, int max_num) : ThreadPool(min_num, max_num)
{
    // malloc mem for num thread info struct
    basicWorkerThreadList = new BasicWorkerThread[maxThreadNum];
    for (int i = 0; i < maxThreadNum; ++i)
        basicWorkerThreadList[i].threadID = i;
}

BasicThreadPool::~BasicThreadPool()
{
    if (basicWorkerThreadList != NULL)
    {
        delete[] basicWorkerThreadList;
        basicWorkerThreadList = NULL;
    }
}

/**
  * member function reality. thread pool init function.
  * para:
  *     this: thread pool struct instance ponter
  * return:
  *     true: successful; false: failed
  */
TPBOOL BasicThreadPool::initialize()
{
    int i, err;
    printf("[Handle Message] : [Thread Pool Initialize]\n");

    // create work thread and init work thread info
    for (i = 0; i < minThreadNum; ++i)
    {
        pthread_cond_init(&basicWorkerThreadList[i].workerThread_cond, NULL);
        pthread_mutex_init(&basicWorkerThreadList[i].workerThread_lock, NULL);
        err = pthread_create(&basicWorkerThreadList[i].workerThread_t, NULL, WorkerHandleThread, &basicWorkerThreadList[i]);
        if (err != 0)
        {
            printf("\t\t\t[initialize: create worker thread failed]\n");
            return TPFALSE;
        }
        printf("\t\t\t[initialize: create worker thread %ld]\n", basicWorkerThreadList[i].workerThread_t);
    }

    pthread_mutex_init(&threadPool_lock, NULL);
    // create manage thread
    err = pthread_create(&threadPool_t, NULL, OriginalManagerThread, this);
    if (err != 0)
    {
        printf("\t\t\t[initialize: create manager thread failed]\n");
        return TPFALSE;
    }
    printf("\t\t\t[initialize: create manager thread %ld]\n", threadPool_t);

    return TPTRUE;
}

/**
  * member function reality. thread pool entirely close function.
  * para:
  *     this: thread pool struct instance ponter
  * return:
  */
TPBOOL BasicThreadPool::close()
{
    void *status;
    printf("[Handle Message] : [Thread Pool Close]\n");

    // close work thread
    for (int i = 0; i < curThreadNum; ++i)
    {
        if (pthread_kill(basicWorkerThreadList[i].workerThread_t, 0) != ESRCH) // ESRCH: thread isn't exists
        {
            pthread_kill(basicWorkerThreadList[i].workerThread_t, SIGQUIT);
            pthread_join(basicWorkerThreadList[i].workerThread_t, &status);
            pthread_mutex_destroy(&basicWorkerThreadList[i].workerThread_lock);
            pthread_cond_destroy(&basicWorkerThreadList[i].workerThread_cond);
            printf("\t\t\t[close: join worker thread %ld]\n", basicWorkerThreadList[i].workerThread_t);
        }
    }
    // close manage thread
    if (pthread_kill(threadPool_t, 0) != ESRCH)
    {
        pthread_kill(threadPool_t, SIGQUIT);
        pthread_join(threadPool_t, &status);
        pthread_mutex_destroy(&threadPool_lock);
        printf("\t\t\t[close: join manager thread %ld]\n", threadPool_t);
    }
}

// sleep 10 us to ensure when pthread_cond_signal() executes, worker thread is waiting at pthread_cond_wait()
#define CHECK_IS_WAIT_FOR_COND_SIGNAL(idx) \
    while (!basicWorkerThreadList[idx].isWait) \
    { \
        pthread_mutex_lock(&basicWorkerThreadList[idx].workerThread_lock); \
        usleep(10); \
        pthread_mutex_unlock(&basicWorkerThreadList[idx].workerThread_lock); \
    } \
    usleep(10); \
    pthread_cond_signal(&basicWorkerThreadList[idx].workerThread_cond);

/**
  * member function reality. main interface opened. 
  * after getting own worker and job, user may use the function to process the task.
  * para:
  *     this: thread pool struct instance ponter
  *    worker: user task reality.
  *    job: user task para
  * return:
  */
void BasicThreadPool::processWork(SocketWorkFunction* func, WorkParameter clientData, int mask)
{
    int i, tmpid;
    TPBOOL res;

    // fill this->basicWorkerThreadList's relative work key
    for (i = 0; i < curThreadNum; i++)
    {
        pthread_mutex_lock(&basicWorkerThreadList[i].workerThread_lock);
        if (basicWorkerThreadList[i].isBusy == TPFALSE)
        {
            DBG_OUT("processWork: thread %d idle, thread id %ld\n", i, basicWorkerThreadList[i].workerThread_t);
            // thread state be set busy before work
            basicWorkerThreadList[i].isBusy = TPTRUE;
            pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock);

            basicWorkerThreadList[i].socketWorkFunc = func;
            basicWorkerThreadList[i].clientData = clientData;
            basicWorkerThreadList[i].socketMask = mask;
            basicWorkerThreadList[i].delayTaskWorkFunc = NULL;

            DBG_OUT("processWork: thread %d turn to busy, thread id %ld\n", i, basicWorkerThreadList[i].workerThread_t);
            
            CHECK_IS_WAIT_FOR_COND_SIGNAL(i)

            return;
        }
        else
        {
            pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock);        
        }
    } // end for

    // if all current thread are busy, new thread is created here
    pthread_mutex_lock(&threadPool_lock);
    if (res = AddWorkerThread(TPTRUE))
    {
        i = curThreadNum - 1;
        basicWorkerThreadList[i].socketWorkFunc = func;
        basicWorkerThreadList[i].clientData = clientData;
        basicWorkerThreadList[i].socketMask = mask;
        basicWorkerThreadList[i].delayTaskWorkFunc = NULL;
    }
    pthread_mutex_unlock(&threadPool_lock);

    if (res)
    {
        CHECK_IS_WAIT_FOR_COND_SIGNAL(i)
    }
}

void BasicThreadPool::processWork(DelayTaskWorkFunction* func, WorkParameter clientData)
{
    int i, tmpid;
    TPBOOL res;

    // fill this->basicWorkerThreadList's relative work key
    for (i = 0; i < curThreadNum; i++) 
    {
        pthread_mutex_lock(&basicWorkerThreadList[i].workerThread_lock);
        if (basicWorkerThreadList[i].isBusy == false)
        {
            DBG_OUT("processWork: threadID %d idle, thread %ld\n", i, basicWorkerThreadList[i].workerThread_t);
            // thread state be set busy before work
            basicWorkerThreadList[i].isBusy = TPTRUE;
            pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock);

            basicWorkerThreadList[i].delayTaskWorkFunc = func;
            basicWorkerThreadList[i].clientData = clientData;
            basicWorkerThreadList[i].socketWorkFunc = NULL;
            basicWorkerThreadList[i].socketMask = -1;

            DBG_OUT("processWork: threadID %d turn to busy, thread %ld\n", i, basicWorkerThreadList[i].workerThread_t);
            
            CHECK_IS_WAIT_FOR_COND_SIGNAL(i)

            return;
        }
        else
        {
            pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock);        
        }
    } //end for

    // if all current thread are busy, new thread is created here
    pthread_mutex_lock(&threadPool_lock);
    if (res = AddWorkerThread(TPTRUE))
    {
        i = curThreadNum - 1;
        basicWorkerThreadList[i].delayTaskWorkFunc = func;
        basicWorkerThreadList[i].clientData = clientData;
        basicWorkerThreadList[i].socketWorkFunc = NULL;
        basicWorkerThreadList[i].socketMask = -1;
    }
    pthread_mutex_unlock(&threadPool_lock);

    if (res)
    {
        CHECK_IS_WAIT_FOR_COND_SIGNAL(i)
    }
}

/**
  * member function reality. add new thread into the pool.
  * para:
  *     this: thread pool struct instance pointer
  * return:
  *     true: successful; false: failed
  */
TPBOOL BasicThreadPool::AddWorkerThread(TPBOOL isBusy)
{
    int err;
    
    if (curThreadNum >= maxThreadNum)
    {
        printf("!!!!!!!!!!    Thread pool full    !!!!!!!!!!\n");
        return TPFALSE;
    }

    // malloc new thread info struct
    BasicWorkerThread *new_thread = (BasicWorkerThread*)(&basicWorkerThreadList[curThreadNum]);

    // init new thread's cond & mutex
    pthread_cond_init(&new_thread->workerThread_cond, NULL);
    pthread_mutex_init(&new_thread->workerThread_lock, NULL);

    // init status is busy
    new_thread->isBusy = isBusy;
    new_thread->isExit = TPFALSE;
    new_thread->isWait = TPFALSE;
    new_thread->delayTaskWorkFunc = NULL;
    new_thread->clientData = NULL;
    new_thread->socketWorkFunc = NULL;
    new_thread->socketMask = -1;

    err = pthread_create(&new_thread->workerThread_t, NULL, WorkerHandleThread, new_thread);
    if (err != 0)
    {
        pthread_mutex_destroy(&new_thread->workerThread_lock);
        pthread_cond_destroy(&new_thread->workerThread_cond);
        new_thread->isBusy = TPFALSE;
        DBG_OUT("ERROR: pthread_create() failed.\n");
        return TPFALSE;
    }

    // add current thread number in the pool.
    ++curThreadNum;

    printf("\t\t\t[Add worker thread %ld, its threadID = %d]\n", basicWorkerThreadList[curThreadNum-1].workerThread_t, curThreadNum-1);

    return TPTRUE;
}

/**
  * member function reality. delete idle thread in the pool.
  * only delete last idle thread in the pool.
  * para:
  *     this: thread pool struct instance pointer
  * return:
  *     true: successful; false: failed
  */
TPBOOL BasicThreadPool::DeleteWorkerThread()
{
    void* status;
    int idx = curThreadNum - 1;
    TPBOOL res;

    // current thread num can't < min thread num
    if (curThreadNum <= minThreadNum)
    {
        DBG_OUT("current thread num can't < min thread num\n");
        return TPFALSE;
    }

    BasicWorkerThread *last_thread = (BasicWorkerThread*)(&basicWorkerThreadList[idx]);
    // check thread status
    pthread_mutex_lock(&last_thread->workerThread_lock);
    if (last_thread->isBusy) // do nothing
    {
        if (last_thread->socketWorkFunc != NULL)
        {
            DBG_OUT("last thread is busy, do nothing. workFunc = %p, clientData = %p, socketMask = %d\n", 
                last_thread->socketWorkFunc, 
                last_thread->clientData, 
                last_thread->socketMask);
        }
        else if (last_thread->delayTaskWorkFunc != NULL)
        {
            DBG_OUT("last thread is busy, do nothing. workFunc = %p, clientData = %p\n", 
                last_thread->delayTaskWorkFunc, 
                last_thread->clientData);
        }
        res = TPFALSE;
        pthread_mutex_unlock(&last_thread->workerThread_lock);
    }
    else // delete last thread
    {
        last_thread->isBusy = TPTRUE;
        pthread_mutex_unlock(&last_thread->workerThread_lock);

        // lock threadPoolLocker to avoid variable 'curThreadNum' error
        pthread_mutex_lock(&threadPool_lock);
        // kill the idle thread and free info struct
        last_thread->isExit = TPTRUE;
        printf("\t\t\t[Delete worker thread %ld, its threadID = %d]\n", last_thread->workerThread_t, idx);
        pthread_cond_signal(&last_thread->workerThread_cond);
        pthread_join(last_thread->workerThread_t, &status);

        pthread_mutex_destroy(&last_thread->workerThread_lock);
        pthread_cond_destroy(&last_thread->workerThread_cond);

        // after deleting idle thread, current thread num minus 1
        --curThreadNum;
        
        pthread_mutex_unlock(&threadPool_lock);
        res = TPTRUE;
    }

    return res;
}

/**
  * member function reality. get real thread by thread id num.
  * para:
  *     this: thread pool struct instance ponter
  *    id: thread id num
  * return:
  *     seq num in thread info struct array
  */
int BasicThreadPool::GetWorkerThreadIdx(pthread_t id)
{
    for (int i = 0; i < curThreadNum; i++)
        if (id == basicWorkerThreadList[i].workerThread_t)
            return i;

    return -1;
}

/**
  * member function reality. get current thread pool status: idle, normal, busy, .etc.
  * para:
  *     this: thread pool struct instance pointer
  * return:
  *     0: idle; 1: normal or busy(don't process)
  */
int BasicThreadPool::GetThreadPoolStatus()
{
    int busy_num = 0;

    // get busy thread number
    for (int i = 0; i < curThreadNum; i++)
        if (basicWorkerThreadList[i].isBusy)
            ++busy_num;

    // 0.2? or other num?
    int busy_ratio = busy_num*100 / curThreadNum;
    printf("\t\t\tThread pool load ratio = %d%%, busy num = %d, total num = %d\n", busy_ratio, busy_num, curThreadNum);

    if (busy_ratio <= BUSY_THRESHOLD) // idle status or normal status
    {
        if (curThreadNum == minThreadNum)
            return ThreadPool_Status_Normal;
        else
            return ThreadPool_Status_Idle;
    }
    else if (busy_ratio <= LIMIT_THRESHOLD) // busy status
    {
        return ThreadPool_Status_Busy;
    }
    else // overload status
    {
        return ThreadPool_Status_ReachLimit;
    }
}

/**
  * internal interface. real work thread.
  * para:
  *     pthread: thread pool struct ponter
  * return:
  */
void* BasicThreadPool::WorkerHandleThread(void *clientData)
{
    BasicWorkerThread *thread = (BasicWorkerThread*)clientData;

    signal(SIGQUIT, ThreadPool::HandleThreadQuit);

    // Set affinity mask to include CPU threadID
    int ret, cpu_idx = thread->threadID % conf::CPUCount;

    CPU_ZERO(&thread->CPUSet);
    CPU_SET(cpu_idx, &thread->CPUSet);

    if (ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &thread->CPUSet) != 0)
    {
        printf("\t\t\t[pthread_setaffinity_np() failed]\n");
        pthread_exit(NULL);
    }
    // Check the actual affinity mask assigned to the thread
    cpu_set_t cpuset;
    if (ret = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0)
    {
        printf("\t\t\t[pthread_getaffinity_np() failed]\n");
        pthread_exit(NULL);
    }
    if (!CPU_ISSET(cpu_idx, &cpuset))
    {
        printf("\t\t\t[Cannot Set Thread to CPU Processor %d]\n", cpu_idx);
        pthread_exit(NULL);
    }
    printf("\t\t\t[Set Thread %ld to Cpu Processor %d]\n", pthread_self(), cpu_idx);

    // wait cond for processing real job.
    do {
        pthread_mutex_lock(&thread->workerThread_lock);
        thread->isWait = TPTRUE;
        thread->isBusy = TPFALSE;
        DBG_OUT("[thread: %ld, before cond_wait]\n", pthread_self());
        pthread_cond_wait(&thread->workerThread_cond, &thread->workerThread_lock);
        DBG_OUT("[thread: %ld, after cond_wait]\n", pthread_self());
        thread->isWait = TPFALSE;
        thread->isBusy = TPTRUE;
        pthread_mutex_unlock(&thread->workerThread_lock);

        if (thread->clientData != NULL)
        {
            DBG_OUT("[thread: %ld do work]\n", pthread_self());
            if (thread->socketWorkFunc != NULL && thread->socketMask != -1)
                thread->socketWorkFunc(thread->clientData, thread->socketMask);
            else if (thread->delayTaskWorkFunc != NULL && thread->socketMask == -1)
                thread->delayTaskWorkFunc(thread->clientData);
        }
        
        // thread state be set idle after work
        pthread_mutex_lock(&thread->workerThread_lock);
        thread->isBusy = TPFALSE;
        thread->socketWorkFunc = NULL;
        thread->delayTaskWorkFunc = NULL;
        thread->clientData = NULL;
        thread->socketMask = -1;
        pthread_mutex_unlock(&thread->workerThread_lock);
        
        if (thread->isExit)
            break;

        printf("\t\t\t[thread: %ld do work over, its threadID = %d]\n", pthread_self(), thread->threadID);
    } while (TPTRUE);

    DBG_OUT("[thread: %ld exits, its threadID = %d]\n", pthread_self(), thread->threadID);
    pthread_exit(NULL);
}

测试程序 test_ThreadPool.cxx:

#include "BasicThreadPool.hxx"
#include <iostream>

using namespace std;

void thread_fun(void *param)
{
    pthread_t curid = pthread_self(); // get current thread id
    int *p = (int *)param;
    for (int i = 0; i < 3; i++)
    {
        printf("i = %d, thread id = %ld, param = %d\n", i, curid, *p);
        sleep(1);
    }
}

int main(int argc, char *argv[])
{
    ThreadPool *threadpool = BasicThreadPool::createNew(4, 8);
    threadpool->initialize();

    int a[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    
    cout << "====================    Schedule one    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[0]);

    cout << "====================    Schedule two    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[1]);

    cout << "====================    Schedule three    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[2]);

    cout << "====================    Schedule four    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[3]);
    
    cout << "====================    Schedule five    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[4]);
    
    cout << "====================    Schedule six    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[5]);

    cout << "====================    Schedule seven    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[6]);
    
    cout << "====================    Schedule eight    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[7]);
    
    cout << "====================    Schedule nine    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[8]);
    
    cout << "====================    Schedule ten    ====================" << endl;
    threadpool->processWork(thread_fun, (void*)&a[9]);
    
    sleep(10);

    threadpool->close();
    cout << "====================    threadpool->close() done!    ====================" << endl;
    delete threadpool;

    return 0;
}

某次运行结果为:

test_ThreadPool_1

test_ThreadPool_2

另一次运行结果为:

test_ThreadPool_3

test_ThreadPool_4

Leave a comment

邮箱地址不会被公开。 必填项已用*标注