本文原创
实现了一个简单的 POSIX 线程池,并且将线程分别绑定到了 CPU 的各个核心上。
抽象类 ThreadPool.hxx:
#ifndef _ThreadPool_HXX #define _ThreadPool_HXX #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <pthread.h> #include <signal.h> #include <unistd.h> #include <errno.h> #ifdef __cplusplus typedef bool TPBOOL; #define TPTRUE true #define TPFALSE false #else typedef int TPBOOL; #define TPTRUE 1 #define TPFALSE 0 #endif #ifndef TPDEBUG #define TPDEBUG 0 #endif #if TPDEBUG #define DBG_OUT(args...) \ do { \ char b__[256]; \ sprintf(b__, args); \ fprintf(stdout, "<%s:%d>\t%s", __FUNCTION__, __LINE__, b__); \ } while(0) #else #define DBG_OUT(args...) #endif #define BUSY_THRESHOLD 20 // (busy thread)/(all thread threshold) #define LIMIT_THRESHOLD 80 // (busy thread)/(all thread threshold) #define MANAGER_INTERVAL 3 // manager thread sleep interval #define ThreadPool_Status_Normal 0 #define ThreadPool_Status_Idle 1 #define ThreadPool_Status_Busy 2 #define ThreadPool_Status_ReachLimit 3 typedef void* WorkParameter; typedef void SocketWorkFunction(void *, int); typedef void DelayTaskWorkFunction(void *); /////////////////////////////////////// ////////// Thread Pool ////////// /////////////////////////////////////// class ThreadPool { public: virtual ~ThreadPool(); virtual TPBOOL initialize(); virtual TPBOOL close(); virtual void processWork(SocketWorkFunction* func, WorkParameter clientData, int mask); virtual void processWork(DelayTaskWorkFunction* func, WorkParameter clientData); protected: ThreadPool(int min_num, int max_num); virtual TPBOOL AddWorkerThread(TPBOOL isBusy) = 0; virtual TPBOOL DeleteWorkerThread() = 0; virtual int GetWorkerThreadIdx(pthread_t id) = 0; virtual int GetThreadPoolStatus() = 0; static void HandleThreadQuit(int signo); static void* OriginalManagerThread(void *clientData); int minThreadNum; // min thread number in the pool int curThreadNum; // current thread number in the pool int maxThreadNum; // max thread number in the pool pthread_t threadPool_t; // manage thread id num pthread_mutex_t threadPool_lock; private: ThreadPool(const ThreadPool&); ThreadPool& operator=(const ThreadPool&); }; #endif
ThreadPool.cxx:
#include "ThreadPool.hxx" using namespace std; /////////////////////////////////////// ////////// Thread Pool ////////// /////////////////////////////////////// ThreadPool::ThreadPool(int min_num, int max_num) : minThreadNum(min_num), curThreadNum(min_num), maxThreadNum(max_num) { } ThreadPool::~ThreadPool() { } /** * member function reality. thread pool init function. * para: * this: thread pool struct instance pointer * return: * true: successful; false: failed */ TPBOOL ThreadPool::initialize() { } /** * member function reality. thread pool entirely close function. * para: * this: thread pool struct instance pointer * return: */ TPBOOL ThreadPool::close() { } void ThreadPool::processWork(SocketWorkFunction* func, WorkParameter clientData, int mask) { (*func)(clientData, mask); } void ThreadPool::processWork(DelayTaskWorkFunction* func, WorkParameter clientData) { (*func)(clientData); } void ThreadPool::HandleThreadQuit(int signo) { DBG_OUT("[Handle sig %d, thread %ld exit]\n", signo, pthread_self()); pthread_exit(NULL); } /** * internal interface. manage thread pool to delete idle thread. * para: * pthread: thread pool struct ponter * return: */ void* ThreadPool::OriginalManagerThread(void *clientData) { ThreadPool *threadpool = (ThreadPool*)clientData; // main thread pool struct instance signal(SIGQUIT, HandleThreadQuit); do { sleep(MANAGER_INTERVAL); switch (threadpool->GetThreadPoolStatus()) { case ThreadPool_Status_Idle: do { if (!threadpool->DeleteWorkerThread()) continue; } while(0); break; case ThreadPool_Status_Busy: break; case ThreadPool_Status_ReachLimit: if (threadpool->curThreadNum < threadpool->maxThreadNum) { pthread_mutex_lock(&threadpool->threadPool_lock); TPBOOL res = threadpool->AddWorkerThread(TPFALSE); pthread_mutex_unlock(&threadpool->threadPool_lock); } break; default: break; } } while(TPTRUE); }
实现类 BasicThreadPool.hxx:
#ifndef _BasicThreadPool_HXX #define _BasicThreadPool_HXX #include "ThreadPool.hxx" #include <sched.h> /////////////////////////////////////////////// ////////// Basic Worker Thread ////////// /////////////////////////////////////////////// typedef struct { int threadID; pthread_t workerThread_t; // thread id num pthread_cond_t workerThread_cond; pthread_mutex_t workerThread_lock; TPBOOL isBusy; // thread status: true-busy; false-idle TPBOOL isExit; TPBOOL isWait; // CAUTION:在未调用pthread_cond_wait时通过pthread_cond_signal发送信号会造成信号丢失! WorkParameter clientData; // void * SocketWorkFunction* socketWorkFunc; // void (*)(void *, int); DelayTaskWorkFunction* delayTaskWorkFunc; // void (*)(void *); int socketMask; cpu_set_t CPUSet; } BasicWorkerThread; ///////////////////////////////////////////// ////////// Basic Thread Pool ////////// ///////////////////////////////////////////// class BasicThreadPool : public ThreadPool { public: static BasicThreadPool* createNew(int min_num, int max_num); virtual ~BasicThreadPool(); virtual TPBOOL initialize(); virtual TPBOOL close(); virtual void processWork(SocketWorkFunction* func, WorkParameter clientData, int mask); virtual void processWork(DelayTaskWorkFunction* func, WorkParameter clientData); protected: BasicThreadPool(int min_num, int max_num); virtual TPBOOL AddWorkerThread(TPBOOL isBusy); virtual TPBOOL DeleteWorkerThread(); virtual int GetWorkerThreadIdx(pthread_t id); virtual int GetThreadPoolStatus(); static void* WorkerHandleThread(void *clientData); private: BasicWorkerThread *basicWorkerThreadList; // work thread relative thread info }; #endif
BasicThreadPool.cxx:
#include "BasicThreadPool.hxx" using namespace std; namespace conf { int CPUCount = sysconf(_SC_NPROCESSORS_CONF); } ////////////////////////////////////////// ////////// Basic Thread Pool ////////// ////////////////////////////////////////// BasicThreadPool* BasicThreadPool::createNew(int min_num, int max_num) { return new BasicThreadPool(min_num, max_num); } BasicThreadPool::BasicThreadPool(int min_num, int max_num) : ThreadPool(min_num, max_num) { // malloc mem for num thread info struct basicWorkerThreadList = new BasicWorkerThread[maxThreadNum]; for (int i = 0; i < maxThreadNum; ++i) basicWorkerThreadList[i].threadID = i; } BasicThreadPool::~BasicThreadPool() { if (basicWorkerThreadList != NULL) { delete[] basicWorkerThreadList; basicWorkerThreadList = NULL; } } /** * member function reality. thread pool init function. * para: * this: thread pool struct instance ponter * return: * true: successful; false: failed */ TPBOOL BasicThreadPool::initialize() { int i, err; printf("[Handle Message] : [Thread Pool Initialize]\n"); // create work thread and init work thread info for (i = 0; i < minThreadNum; ++i) { pthread_cond_init(&basicWorkerThreadList[i].workerThread_cond, NULL); pthread_mutex_init(&basicWorkerThreadList[i].workerThread_lock, NULL); err = pthread_create(&basicWorkerThreadList[i].workerThread_t, NULL, WorkerHandleThread, &basicWorkerThreadList[i]); if (err != 0) { printf("\t\t\t[initialize: create worker thread failed]\n"); return TPFALSE; } printf("\t\t\t[initialize: create worker thread %ld]\n", basicWorkerThreadList[i].workerThread_t); } pthread_mutex_init(&threadPool_lock, NULL); // create manage thread err = pthread_create(&threadPool_t, NULL, OriginalManagerThread, this); if (err != 0) { printf("\t\t\t[initialize: create manager thread failed]\n"); return TPFALSE; } printf("\t\t\t[initialize: create manager thread %ld]\n", threadPool_t); return TPTRUE; } /** * member function reality. thread pool entirely close function. * para: * this: thread pool struct instance ponter * return: */ TPBOOL BasicThreadPool::close() { void *status; printf("[Handle Message] : [Thread Pool Close]\n"); // close work thread for (int i = 0; i < curThreadNum; ++i) { if (pthread_kill(basicWorkerThreadList[i].workerThread_t, 0) != ESRCH) // ESRCH: thread isn't exists { pthread_kill(basicWorkerThreadList[i].workerThread_t, SIGQUIT); pthread_join(basicWorkerThreadList[i].workerThread_t, &status); pthread_mutex_destroy(&basicWorkerThreadList[i].workerThread_lock); pthread_cond_destroy(&basicWorkerThreadList[i].workerThread_cond); printf("\t\t\t[close: join worker thread %ld]\n", basicWorkerThreadList[i].workerThread_t); } } // close manage thread if (pthread_kill(threadPool_t, 0) != ESRCH) { pthread_kill(threadPool_t, SIGQUIT); pthread_join(threadPool_t, &status); pthread_mutex_destroy(&threadPool_lock); printf("\t\t\t[close: join manager thread %ld]\n", threadPool_t); } } // sleep 10 us to ensure when pthread_cond_signal() executes, worker thread is waiting at pthread_cond_wait() #define CHECK_IS_WAIT_FOR_COND_SIGNAL(idx) \ while (!basicWorkerThreadList[idx].isWait) \ { \ pthread_mutex_lock(&basicWorkerThreadList[idx].workerThread_lock); \ usleep(10); \ pthread_mutex_unlock(&basicWorkerThreadList[idx].workerThread_lock); \ } \ usleep(10); \ pthread_cond_signal(&basicWorkerThreadList[idx].workerThread_cond); /** * member function reality. main interface opened. * after getting own worker and job, user may use the function to process the task. * para: * this: thread pool struct instance ponter * worker: user task reality. * job: user task para * return: */ void BasicThreadPool::processWork(SocketWorkFunction* func, WorkParameter clientData, int mask) { int i, tmpid; TPBOOL res; // fill this->basicWorkerThreadList's relative work key for (i = 0; i < curThreadNum; i++) { pthread_mutex_lock(&basicWorkerThreadList[i].workerThread_lock); if (basicWorkerThreadList[i].isBusy == TPFALSE) { DBG_OUT("processWork: thread %d idle, thread id %ld\n", i, basicWorkerThreadList[i].workerThread_t); // thread state be set busy before work basicWorkerThreadList[i].isBusy = TPTRUE; pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock); basicWorkerThreadList[i].socketWorkFunc = func; basicWorkerThreadList[i].clientData = clientData; basicWorkerThreadList[i].socketMask = mask; basicWorkerThreadList[i].delayTaskWorkFunc = NULL; DBG_OUT("processWork: thread %d turn to busy, thread id %ld\n", i, basicWorkerThreadList[i].workerThread_t); CHECK_IS_WAIT_FOR_COND_SIGNAL(i) return; } else { pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock); } } // end for // if all current thread are busy, new thread is created here pthread_mutex_lock(&threadPool_lock); if (res = AddWorkerThread(TPTRUE)) { i = curThreadNum - 1; basicWorkerThreadList[i].socketWorkFunc = func; basicWorkerThreadList[i].clientData = clientData; basicWorkerThreadList[i].socketMask = mask; basicWorkerThreadList[i].delayTaskWorkFunc = NULL; } pthread_mutex_unlock(&threadPool_lock); if (res) { CHECK_IS_WAIT_FOR_COND_SIGNAL(i) } } void BasicThreadPool::processWork(DelayTaskWorkFunction* func, WorkParameter clientData) { int i, tmpid; TPBOOL res; // fill this->basicWorkerThreadList's relative work key for (i = 0; i < curThreadNum; i++) { pthread_mutex_lock(&basicWorkerThreadList[i].workerThread_lock); if (basicWorkerThreadList[i].isBusy == false) { DBG_OUT("processWork: threadID %d idle, thread %ld\n", i, basicWorkerThreadList[i].workerThread_t); // thread state be set busy before work basicWorkerThreadList[i].isBusy = TPTRUE; pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock); basicWorkerThreadList[i].delayTaskWorkFunc = func; basicWorkerThreadList[i].clientData = clientData; basicWorkerThreadList[i].socketWorkFunc = NULL; basicWorkerThreadList[i].socketMask = -1; DBG_OUT("processWork: threadID %d turn to busy, thread %ld\n", i, basicWorkerThreadList[i].workerThread_t); CHECK_IS_WAIT_FOR_COND_SIGNAL(i) return; } else { pthread_mutex_unlock(&basicWorkerThreadList[i].workerThread_lock); } } //end for // if all current thread are busy, new thread is created here pthread_mutex_lock(&threadPool_lock); if (res = AddWorkerThread(TPTRUE)) { i = curThreadNum - 1; basicWorkerThreadList[i].delayTaskWorkFunc = func; basicWorkerThreadList[i].clientData = clientData; basicWorkerThreadList[i].socketWorkFunc = NULL; basicWorkerThreadList[i].socketMask = -1; } pthread_mutex_unlock(&threadPool_lock); if (res) { CHECK_IS_WAIT_FOR_COND_SIGNAL(i) } } /** * member function reality. add new thread into the pool. * para: * this: thread pool struct instance pointer * return: * true: successful; false: failed */ TPBOOL BasicThreadPool::AddWorkerThread(TPBOOL isBusy) { int err; if (curThreadNum >= maxThreadNum) { printf("!!!!!!!!!! Thread pool full !!!!!!!!!!\n"); return TPFALSE; } // malloc new thread info struct BasicWorkerThread *new_thread = (BasicWorkerThread*)(&basicWorkerThreadList[curThreadNum]); // init new thread's cond & mutex pthread_cond_init(&new_thread->workerThread_cond, NULL); pthread_mutex_init(&new_thread->workerThread_lock, NULL); // init status is busy new_thread->isBusy = isBusy; new_thread->isExit = TPFALSE; new_thread->isWait = TPFALSE; new_thread->delayTaskWorkFunc = NULL; new_thread->clientData = NULL; new_thread->socketWorkFunc = NULL; new_thread->socketMask = -1; err = pthread_create(&new_thread->workerThread_t, NULL, WorkerHandleThread, new_thread); if (err != 0) { pthread_mutex_destroy(&new_thread->workerThread_lock); pthread_cond_destroy(&new_thread->workerThread_cond); new_thread->isBusy = TPFALSE; DBG_OUT("ERROR: pthread_create() failed.\n"); return TPFALSE; } // add current thread number in the pool. ++curThreadNum; printf("\t\t\t[Add worker thread %ld, its threadID = %d]\n", basicWorkerThreadList[curThreadNum-1].workerThread_t, curThreadNum-1); return TPTRUE; } /** * member function reality. delete idle thread in the pool. * only delete last idle thread in the pool. * para: * this: thread pool struct instance pointer * return: * true: successful; false: failed */ TPBOOL BasicThreadPool::DeleteWorkerThread() { void* status; int idx = curThreadNum - 1; TPBOOL res; // current thread num can't < min thread num if (curThreadNum <= minThreadNum) { DBG_OUT("current thread num can't < min thread num\n"); return TPFALSE; } BasicWorkerThread *last_thread = (BasicWorkerThread*)(&basicWorkerThreadList[idx]); // check thread status pthread_mutex_lock(&last_thread->workerThread_lock); if (last_thread->isBusy) // do nothing { if (last_thread->socketWorkFunc != NULL) { DBG_OUT("last thread is busy, do nothing. workFunc = %p, clientData = %p, socketMask = %d\n", last_thread->socketWorkFunc, last_thread->clientData, last_thread->socketMask); } else if (last_thread->delayTaskWorkFunc != NULL) { DBG_OUT("last thread is busy, do nothing. workFunc = %p, clientData = %p\n", last_thread->delayTaskWorkFunc, last_thread->clientData); } res = TPFALSE; pthread_mutex_unlock(&last_thread->workerThread_lock); } else // delete last thread { last_thread->isBusy = TPTRUE; pthread_mutex_unlock(&last_thread->workerThread_lock); // lock threadPoolLocker to avoid variable 'curThreadNum' error pthread_mutex_lock(&threadPool_lock); // kill the idle thread and free info struct last_thread->isExit = TPTRUE; printf("\t\t\t[Delete worker thread %ld, its threadID = %d]\n", last_thread->workerThread_t, idx); pthread_cond_signal(&last_thread->workerThread_cond); pthread_join(last_thread->workerThread_t, &status); pthread_mutex_destroy(&last_thread->workerThread_lock); pthread_cond_destroy(&last_thread->workerThread_cond); // after deleting idle thread, current thread num minus 1 --curThreadNum; pthread_mutex_unlock(&threadPool_lock); res = TPTRUE; } return res; } /** * member function reality. get real thread by thread id num. * para: * this: thread pool struct instance ponter * id: thread id num * return: * seq num in thread info struct array */ int BasicThreadPool::GetWorkerThreadIdx(pthread_t id) { for (int i = 0; i < curThreadNum; i++) if (id == basicWorkerThreadList[i].workerThread_t) return i; return -1; } /** * member function reality. get current thread pool status: idle, normal, busy, .etc. * para: * this: thread pool struct instance pointer * return: * 0: idle; 1: normal or busy(don't process) */ int BasicThreadPool::GetThreadPoolStatus() { int busy_num = 0; // get busy thread number for (int i = 0; i < curThreadNum; i++) if (basicWorkerThreadList[i].isBusy) ++busy_num; // 0.2? or other num? int busy_ratio = busy_num*100 / curThreadNum; printf("\t\t\tThread pool load ratio = %d%%, busy num = %d, total num = %d\n", busy_ratio, busy_num, curThreadNum); if (busy_ratio <= BUSY_THRESHOLD) // idle status or normal status { if (curThreadNum == minThreadNum) return ThreadPool_Status_Normal; else return ThreadPool_Status_Idle; } else if (busy_ratio <= LIMIT_THRESHOLD) // busy status { return ThreadPool_Status_Busy; } else // overload status { return ThreadPool_Status_ReachLimit; } } /** * internal interface. real work thread. * para: * pthread: thread pool struct ponter * return: */ void* BasicThreadPool::WorkerHandleThread(void *clientData) { BasicWorkerThread *thread = (BasicWorkerThread*)clientData; signal(SIGQUIT, ThreadPool::HandleThreadQuit); // Set affinity mask to include CPU threadID int ret, cpu_idx = thread->threadID % conf::CPUCount; CPU_ZERO(&thread->CPUSet); CPU_SET(cpu_idx, &thread->CPUSet); if (ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &thread->CPUSet) != 0) { printf("\t\t\t[pthread_setaffinity_np() failed]\n"); pthread_exit(NULL); } // Check the actual affinity mask assigned to the thread cpu_set_t cpuset; if (ret = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { printf("\t\t\t[pthread_getaffinity_np() failed]\n"); pthread_exit(NULL); } if (!CPU_ISSET(cpu_idx, &cpuset)) { printf("\t\t\t[Cannot Set Thread to CPU Processor %d]\n", cpu_idx); pthread_exit(NULL); } printf("\t\t\t[Set Thread %ld to Cpu Processor %d]\n", pthread_self(), cpu_idx); // wait cond for processing real job. do { pthread_mutex_lock(&thread->workerThread_lock); thread->isWait = TPTRUE; thread->isBusy = TPFALSE; DBG_OUT("[thread: %ld, before cond_wait]\n", pthread_self()); pthread_cond_wait(&thread->workerThread_cond, &thread->workerThread_lock); DBG_OUT("[thread: %ld, after cond_wait]\n", pthread_self()); thread->isWait = TPFALSE; thread->isBusy = TPTRUE; pthread_mutex_unlock(&thread->workerThread_lock); if (thread->clientData != NULL) { DBG_OUT("[thread: %ld do work]\n", pthread_self()); if (thread->socketWorkFunc != NULL && thread->socketMask != -1) thread->socketWorkFunc(thread->clientData, thread->socketMask); else if (thread->delayTaskWorkFunc != NULL && thread->socketMask == -1) thread->delayTaskWorkFunc(thread->clientData); } // thread state be set idle after work pthread_mutex_lock(&thread->workerThread_lock); thread->isBusy = TPFALSE; thread->socketWorkFunc = NULL; thread->delayTaskWorkFunc = NULL; thread->clientData = NULL; thread->socketMask = -1; pthread_mutex_unlock(&thread->workerThread_lock); if (thread->isExit) break; printf("\t\t\t[thread: %ld do work over, its threadID = %d]\n", pthread_self(), thread->threadID); } while (TPTRUE); DBG_OUT("[thread: %ld exits, its threadID = %d]\n", pthread_self(), thread->threadID); pthread_exit(NULL); }
测试程序 test_ThreadPool.cxx:
#include "BasicThreadPool.hxx" #include <iostream> using namespace std; void thread_fun(void *param) { pthread_t curid = pthread_self(); // get current thread id int *p = (int *)param; for (int i = 0; i < 3; i++) { printf("i = %d, thread id = %ld, param = %d\n", i, curid, *p); sleep(1); } } int main(int argc, char *argv[]) { ThreadPool *threadpool = BasicThreadPool::createNew(4, 8); threadpool->initialize(); int a[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; cout << "==================== Schedule one ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[0]); cout << "==================== Schedule two ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[1]); cout << "==================== Schedule three ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[2]); cout << "==================== Schedule four ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[3]); cout << "==================== Schedule five ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[4]); cout << "==================== Schedule six ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[5]); cout << "==================== Schedule seven ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[6]); cout << "==================== Schedule eight ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[7]); cout << "==================== Schedule nine ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[8]); cout << "==================== Schedule ten ====================" << endl; threadpool->processWork(thread_fun, (void*)&a[9]); sleep(10); threadpool->close(); cout << "==================== threadpool->close() done! ====================" << endl; delete threadpool; return 0; }
某次运行结果为:
另一次运行结果为: