后端技术_手写线程池与性能分析
最近更新:2024-09-23
|
字数总计:1.7k
|
阅读估时:8分钟
|
阅读量: 次
线程池 线程池基础 线程池实现 接口设计 数据结构设计 内部实现细节 接口实现细节 线程池测试 数据结构测试 生成动态链接库 功能测试 性能测试
线程池 线程池基础
线程池:管理维持固定数量的池式结构,维持:为了防止频繁创建和销毁线程,固定:防止系统负担。
固定数量(经验公式):
cpu密集型任务——n(cpu核心数量)proc
io密集型任务——2*n proc
什么时候需要:某类任务特别耗时,严重影响该线程处理其他任务
特点:异步的
作用:a.复用线程资源;b.充分利用系统资源
模型:生产者、消费者模型
生产者线程 发布任务
队列 存储任务 调度线程池
消费者线程 取出任务 执行任务
线程池实现 接口设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #ifndef _THREAD_POOL_H #define _THREAD_POOL_H typedef struct thrdpool_s thrdpool_t ;typedef void (*handler_pt) (void *) ;#ifdef __cplusplus extern "C" { #endif thrdpool_t *thrdpool_create (int thrd_count) ;void thrdpool_terminate (thrdpool_t * pool) ;int thrdpool_post (thrdpool_t *pool, handler_pt func, void *arg) ;void thrdpool_waitdone (thrdpool_t *pool) ;#ifdef __cplusplus } #endif
数据结构设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 typedef struct spinlock spinlock_t ;typedef struct task_s { void *next; handler_pt func; void *arg; } task_t ; typedef struct task_queue_s { void *head; void **tail; int block; spinlock_t lock; pthread_mutex_t mutex; pthread_cond_t cond; } task_queue_t ; struct thrdpool_s { task_queue_t *task_queue; atomic_int quit; int thrd_count; pthread_t *threads; };
临界资源处理方式:
锁:(粒度很大)
自旋锁(锁内容执行速度快,采用令阻塞线程空转cpu的方式)
互斥锁(锁内容执行慢,采用令阻塞线程让出cpu等待调度的方式)
原子变量(粒度非常小、单基础变量)
内存屏障 (粒度大)
内部实现细节 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 static task_queue_t *__taskqueue_create() { task_queue_t *queue = (task_queue_t *)malloc (sizeof (*queue )); if (!queue ) return NULL ; int ret; ret = pthread_mutex_init(&queue ->mutex, NULL ); if (ret == 0 ) { ret = pthread_cond_init(&queue ->cond, NULL ); if (ret == 0 ) { spinlock_init(&queue ->lock); queue ->head = NULL ; queue ->tail = &queue ->head; queue ->block = 1 ; return queue ; } pthread_cond_destroy(&queue ->cond); } pthread_mutex_destroy(&queue ->mutex); return NULL ; } static void __nonblock(task_queue_t *queue ) { pthread_mutex_lock(&queue ->mutex); queue ->block = 0 ; pthread_mutex_unlock(&queue ->mutex); pthread_cond_broadcast(&queue ->cond); } static inline void __add_task(task_queue_t *queue , void *task) { void **link = (void **)task; *link = NULL ; spinlock_lock(&queue ->lock); *queue ->tail = link; queue ->tail = link; spinlock_unlock(&queue ->lock); pthread_cond_signal(&queue ->cond); } static inline void * __pop_task(task_queue_t *queue ) { spinlock_lock(&queue ->lock); if (queue ->head == NULL ) { spinlock_unlock(&queue ->lock); return NULL ; } task_t *task; task = queue ->head; queue ->head = task->next; if (queue ->head == NULL ) { queue ->tail = &queue ->head; } spinlock_unlock(&queue ->lock); return task; } static inline void * __get_task(task_queue_t *queue ) { task_t *task; while ((task = __pop_task(queue )) == NULL ) { pthread_mutex_lock(&queue ->mutex); if (queue ->block == 0 ) { pthread_mutex_unlock(&queue ->mutex); return NULL ; } pthread_cond_wait(&queue ->cond, &queue ->mutex); pthread_mutex_unlock(&queue ->mutex); } return task; } static void __taskqueue_destroy(task_queue_t *queue ) { task_t *task; while ((task = __pop_task(queue ))) { free (task); } spinlock_destroy(&queue ->lock); pthread_cond_destroy(&queue ->cond); pthread_mutex_destroy(&queue ->mutex); free (queue ); } static void *__thrdpool_worker(void *arg) { thrdpool_t *pool = (thrdpool_t *) arg; task_t *task; void *ctx; while (atomic_load (&pool->quit) == 0 ) { task = (task_t *)__get_task(pool->task_queue); if (!task) break ; handler_pt func = task->func; ctx = task->arg; free (task); func(ctx); } return NULL ; } static void __threads_terminate(thrdpool_t * pool) { atomic_store (&pool->quit, 1 ); __nonblock(pool->task_queue); int i; for (i=0 ; i<pool->thrd_count; i++) { pthread_join(pool->threads[i], NULL ); } } static int __threads_create(thrdpool_t *pool, size_t thrd_count) { pthread_attr_t attr; int ret; ret = pthread_attr_init(&attr); if (ret == 0 ) { pool->threads = (pthread_t *)malloc (sizeof (pthread_t ) * thrd_count); if (pool->threads) { int i = 0 ; for (; i < thrd_count; i++) { if (pthread_create(&pool->threads[i], &attr, __thrdpool_worker, pool) != 0 ) { break ; } } pool->thrd_count = i; pthread_attr_destroy(&attr); if (i == thrd_count) return 0 ; __threads_terminate(pool); free (pool->threads); } ret = -1 ; } return ret; }
接口实现细节 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 void thrdpool_terminate (thrdpool_t * pool) { atomic_store (&pool->quit, 1 ); __nonblock(pool->task_queue); } thrdpool_t *thrdpool_create (int thrd_count) { thrdpool_t *pool; pool = (thrdpool_t *) malloc (sizeof (*pool)); if (!pool) return NULL ; task_queue_t *queue = __taskqueue_create(); if (queue ) { pool->task_queue = queue ; atomic_init (&pool->quit, 0 ); if (__threads_create(pool, thrd_count) == 0 ) { return pool; } __taskqueue_destroy(pool->task_queue); } free (pool); return NULL ; } int thrdpool_post (thrdpool_t *pool, handler_pt func, void *arg) { if (atomic_load (&pool->quit) == 1 ) { return -1 ; } task_t *task = (task_t *)malloc (sizeof (task_t )); if (!task) return -1 ; task->func = func; task->arg = arg; __add_task(pool->task_queue, task); return 0 ; } void thrdpool_waitdone (thrdpool_t *pool) { int i; for (i=0 ; i<pool->thrd_count; i++) { pthread_join(pool->threads[i], NULL ); } __taskqueue_destroy(pool->task_queue); free (pool->threads); free (pool); }
线程池测试 数据结构测试
google test1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 TEST(task_queue, normal) { int i; task_t *task; task_queue_t * queue = __taskqueue_create(); for (i=0 ; i<10 ; i++) { task_t *task = (task_t *)malloc (sizeof (*task)); __add_task(queue , task); } i = 0 ; while (queue ->head) { task = __pop_task(queue ); free (task); i++; } ASSERT_TRUE(i==10 ); for (i=0 ; i<10 ; i++) { task_t *task = (task_t *)malloc (sizeof (*task)); __add_task(queue , task); } i = 0 ; while (queue ->head) { task = __pop_task(queue ); free (task); i++; } ASSERT_TRUE(i==10 ); }
生成动态链接库
功能测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 int done = 0 ;pthread_mutex_t lock;void do_task (void *arg) { thrdpool_t *pool = (thrdpool_t *)arg; pthread_mutex_lock(&lock); done++; printf ("doing %d task\n" , done); pthread_mutex_unlock(&lock); if (done >= 1000 ) { thrdpool_terminate(pool); } } void test_thrdpool_basic () { int threads = 8 ; pthread_mutex_init(&lock, NULL ); thrdpool_t *pool = thrdpool_create(threads); if (pool == NULL ) { perror("thread pool create error!\n" ); exit (-1 ); } while (thrdpool_post(pool, &do_task, pool) == 0 ) { } thrdpool_waitdone(pool); pthread_mutex_destroy(&lock); } int main (int argc, char **argv) { test_thrdpool_basic(); return 0 ; }
性能测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 s time_t GetTick () { return std::chrono::duration_cast <std::chrono::milliseconds>( std::chrono::steady_clock::now ().time_since_epoch () ).count (); } std::atomic<int64_t > g_count{0 }; void JustTask (void *ctx) { ++g_count; } constexpr int64_t n = 1000000 ;void producer (thrdpool_t *pool) { for (int64_t i=0 ; i < n; ++i) { thrdpool_post (pool, JustTask, NULL ); } } void test_thrdpool (int nproducer, int nconsumer) { auto pool = thrdpool_create (nconsumer); for (int i=0 ; i<nproducer; ++i) { std::thread (&producer, pool).detach (); } time_t t1 = GetTick (); while (g_count.load () != n*nproducer) { usleep (100000 ); } time_t t2 = GetTick (); std::cout << t2 << " " << t1 << " " << "used:" << t2-t1 << " exec per sec:" << (double )g_count.load ()*1000 / (t2-t1) << std::endl; thrdpool_terminate (pool); thrdpool_waitdone (pool); } int main () { test_thrdpool (4 , 4 ); return 0 ; }
2024-02-28
该篇文章被 Cleofwine
归为分类:
服务端