3. libuv 线程池
参见libuv/src/threadpool.c文件
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
void* wq[2];
};
uv__work就代表一个task,可以看到里面有两个函数指针(work代表任务实际操作,done用于对任务进行状态确认)。wq成员就是一个QUEUE的节点, uv__work就是通过wq与其他 uv__work连接成一个队列。
下面来看一下threadpool的初始化,代码如下:
#include "uv-common.h"
#include <stdlib.h>
#define MAX_THREADPOOL_SIZE 1024
static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_mutex_t mutex;
static unsigned int idle_threads;
static unsigned int slow_io_work_running;
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
static QUEUE exit_message;
static QUEUE wq;
static QUEUE run_slow_work_message;
static QUEUE slow_io_pending_wq;
static void init_threads(void) {
unsigned int i;
const char* val;
uv_sem_t sem;
/*
* 1.线程池中的线程数,默认值为4
*/
nthreads = ARRAY_SIZE(default_threads);
/*
* 2.从环境变量中得到UV_THREADPOOL_SIZE值,如果有的话,就更新nthreads值为环境变量的值
*/
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nthreads = atoi(val);
/*
* 3.线程数量nthreads范围检测 (1-MAX_THREADPOOL_SIZE)
*/
if (nthreads == 0)
nthreads = 1;
if (nthreads > MAX_THREADPOOL_SIZE)
nthreads = MAX_THREADPOOL_SIZE;
/*
* 4. 更新线程指针threads,如果nthreads大于默认的4个线程,就重新申请nthreads个内存
*/
threads = default_threads;
if (nthreads > ARRAY_SIZE(default_threads)) {
threads = uv__malloc(nthreads * sizeof(threads[0]));
if (threads == NULL) {
nthreads = ARRAY_SIZE(default_threads);
threads = default_threads;
}
}
/*
* 5. 初始化条件变量
*/
if (uv_cond_init(&cond))
abort();
/*
* 6. 初始互斥锁
*/
if (uv_mutex_init(&mutex))
abort();
/*
* 7. 初始化线程池队列头wq 这是个全局变量,还有slow_io_pending_wq,run_slow_work_message
*/
QUEUE_INIT(&wq);
QUEUE_INIT(&slow_io_pending_wq);
QUEUE_INIT(&run_slow_work_message);
/*
* 6. 初始话一个局部的信号量sem
*/
if (uv_sem_init(&sem, 0))
abort();
/*
* 6. 创建nthreads个线程,且回调函数都是worker
*/
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
/*
* 7.等待信号量释放,如果释放掉说明,上面uv_thread_create成功
*/
for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem);
uv_sem_destroy(&sem);
}
上面的代码中,一共创建了nthreads个线程,那么每个线程的执行代码是什么呢?由线程创建代码:uv_thread_create(threads + i, worker, &sem),可以看到,每一个线程都是执行worker函数,下面看看worker函数都在做什么:
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
int is_slow_work;
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
/*
* 因为是多线程,所以需要互斥锁 ,这里加锁
*/
uv_mutex_lock(&mutex);
for (;;) {
/* `mutex` should always be locked at this point. */
/* Keep waiting while either no work is present or only slow I/O
* and we're at the threshold for that.
* 如果任务队列是空的,或者 (slow I/O 且在临界点)
*
*/
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1; // 空闲线程数加1
uv_cond_wait(&cond, &mutex); // 一直在等待条件变量
idle_threads -= 1; // 被唤醒之后,说明有任务被post到队列,因此空闲线程数需要减1
}
/*
* 取出第一个task
*/
q = QUEUE_HEAD(&wq);
if (q == &exit_message) {
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
break;
}
// 从队列中移除这个task
QUEUE_REMOVE(q);
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
is_slow_work = 0;
if (q == &run_slow_work_message) {
/* If we're at the slow I/O threshold, re-schedule until after all
other work in the queue is done. */
if (slow_io_work_running >= slow_work_thread_threshold()) {
QUEUE_INSERT_TAIL(&wq, q);
continue;
}
/* If we encountered a request to run slow I/O work but there is none
to run, that means it's cancelled => Start over. */
if (QUEUE_EMPTY(&slow_io_pending_wq))
continue;
is_slow_work = 1;
slow_io_work_running++;
q = QUEUE_HEAD(&slow_io_pending_wq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
/* If there is more slow I/O work, schedule it to be run as well. */
if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
if (idle_threads > 0)
uv_cond_signal(&cond);
}
}
/*
* 解锁
*/
uv_mutex_unlock(&mutex);
/*
* 根据节点q取出 uv__work 然后调用自己的回调函数
*/
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
/* Lock `mutex` since that is expected at the start of the next
* iteration. */
uv_mutex_lock(&mutex);
if (is_slow_work) {
/* `slow_io_work_running` is protected by `mutex`. */
slow_io_work_running--;
}
}
}
可以看到,多个线程都会在worker方法中等待在conn条件变量上,一旦有任务加入队列,线程就会被唤醒,然后只有一个线程会得到任务的执行权,其他的线程只能继续等待。
那么如何向队列提交一个task呢?看以下代码:
1 void uv__work_submit(uv_loop_t* loop,
2 struct uv__work* w,
3 void (*work)(struct uv__work* w),
4 void (*done)(struct uv__work* w, int status)) {
5 uv_once(&once, init_once);
6 // 构造一个task
7 w->loop = loop;
8 w->work = work;
9 w->done = done;
10 // 将其插入任务队列
11 post(&w->wq);
12 }
接着看post做了什么:
static void post(QUEUE* q, enum uv__work_kind kind) {
// 同步队列操作
uv_mutex_lock(&mutex);
if (kind == UV__WORK_SLOW_IO) {
/* Insert into a separate queue. */
QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
if (!QUEUE_EMPTY(&run_slow_work_message)) {
/* Running slow I/O tasks is already scheduled => Nothing to do here.
The worker that runs said other task will schedule this one as well. */
uv_mutex_unlock(&mutex);
return;
}
q = &run_slow_work_message;
}
// 将task插入队列尾部
QUEUE_INSERT_TAIL(&wq, q);
// 如果当前有空闲线程,就向条件变量发送信号
if (idle_threads > 0)
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}
有提交任务,就肯定会有取消一个任务的操作,是的,他就是uv__work_cancel,代码如下:
static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
int cancelled;
uv_mutex_lock(&mutex);
uv_mutex_lock(&w->loop->wq_mutex);
// 只有当前队列不为空并且要取消的uv__work有效时才会继续执行
cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
if (cancelled)
QUEUE_REMOVE(&w->wq);;// 从队列中移除task
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_unlock(&mutex);
if (!cancelled)
return UV_EBUSY;
// 更新这个task的状态
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
uv_async_send(&loop->wq_async);
uv_mutex_unlock(&loop->wq_mutex);
return 0;
}
至此,一个线程池的组成以及实现原理都说完了,可以看到,libuv几乎是用了最少的代码完成了高效的线程池。