博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
nginx源码分析——线程池
阅读量:6190 次
发布时间:2019-06-21

本文共 8843 字,大约阅读时间需要 29 分钟。

源码: nginx 1.13.0-release
 
一、前言
     nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响。但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA)。其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的。下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误、不足还请大家指出,谢谢)
 
二、thread_pool线程池模块介绍
     nginx的主要功能都是由一个个模块构成的,thread_pool也不例外。线程池主要用于读取、发送文件等IO操作,避免慢速IO影响worker的正常运行。先引用一段官方的配置示例
Syntax: thread_pool name threads=number [max_queue=number];Default: thread_pool default threads=32 max_queue=65536;Context: main
     根据上述的配置说明,thread_pool是有名字的,上面的线程数目以及队列大小都是指每个worker进程中的线程,而不是所有worker中线程的总数。一个线程池中所有的线程共享一个队列,队列中的最大人数数量为上面定义的max_queue,如果队列满了的话,再往队列中添加任务就会报错。
 
     根据之前讲到过的模块初始化流程(
在master启动worker之前) create_conf--> command_set函数-->init_conf,下面就按照这个流程看看thread_pool模块的初始化
/*******************  nginx/src/core/ngx_thread_pool.c  ************************///创建线程池所需的基础结构static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle){    ngx_thread_pool_conf_t  *tcf;     //从cycle->pool指向的内存池中申请一块内存    tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));    if (tcf == NULL) {        return NULL;    }          //先申请包含4个ngx_thread_pool_t指针类型元素的数组     //ngx_thread_pool_t结构体中保存了一个线程池相关的信息    if (ngx_array_init(&tcf->pools, cycle->pool, 4,                       sizeof(ngx_thread_pool_t *))        != NGX_OK)    {        return NULL;    }     return tcf;} //解析处理配置文件中thread_pool的配置,并将相关信息保存的ngx_thread_pool_t中static char *  ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf){    ngx_str_t          *value;    ngx_uint_t          i;    ngx_thread_pool_t  *tp;     value = cf->args->elts;     //根据thread_pool配置中的name作为线程池的唯一标识(如果重名,只有第一个有效)    //申请ngx_thread_pool_t结构保存线程池的相关信息    //由此可见,nginx支持配置多个name不同的线程池    tp = ngx_thread_pool_add(cf, &value[1]);    .......    //处理thread_pool配置行的所有元素    for (i = 2; i < cf->args->nelts; i++) {        //检查配置的线程数        if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {         .......        }                //检查配置的最大队列长度        if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {         .......        }    }    ......} //判断包含多个线程池的数组中的各个线程池的配置是否正确static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf){    ....    ngx_thread_pool_t  **tpp;     tpp = tcf->pools.elts;    //遍历数组中所有的线程池配置,并检查其正确性    for (i = 0; i < tcf->pools.nelts; i++) {        .....    }     return NGX_CONF_OK;}
 
     在上述的流程走完之后,nginx的master就保存了一份所有线程池的配置(tcf->pools),这份配置在创建worker时也会被继承。然后每个worker中都调用各个核心模块的init_process函数(如果有的话)。
/*******************  nginx/src/core/ngx_thread_pool.c  ************************///创建线程池所需的基础结构static ngx_int_tngx_thread_pool_init_worker(ngx_cycle_t *cycle){    ngx_uint_t                i;    ngx_thread_pool_t       **tpp;    ngx_thread_pool_conf_t   *tcf;    //如果不是worker或者只有一个worker就不起用线程池    if (ngx_process != NGX_PROCESS_WORKER        && ngx_process != NGX_PROCESS_SINGLE)    {        return NGX_OK;    }         //初始化任务队列    ngx_thread_pool_queue_init(&ngx_thread_pool_done);     tpp = tcf->pools.elts;    for (i = 0; i < tcf->pools.nelts; i++) {        //初始化各个线程池        if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {            return NGX_ERROR;        }    }     return NGX_OK;} //线程池初始化static ngx_int_t  ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool){    .....    //初始化任务队列    ngx_thread_pool_queue_init(&tp->queue);     //创建线程锁    if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {        return NGX_ERROR;    }     //创建线程条件变量    if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {        (void) ngx_thread_mutex_destroy(&tp->mtx, log);        return NGX_ERROR;    }    ......    for (n = 0; n < tp->threads; n++) {        //创建线程池中的每个线程        err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);        if (err) {            ngx_log_error(NGX_LOG_ALERT, log, err,                          "pthread_create() failed");            return NGX_ERROR;        }    }    ......} //线程池中线程处理主函数static void *ngx_thread_pool_cycle(void *data){     ......     for ( ;; ) {        //阻塞的方式获取线程锁        if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {            return NULL;        }         /* the number may become negative */        tp->waiting--;         //如果任务队列为空,就cond_wait阻塞等待有新任务时调用cond_signal/broadcast触发        while (tp->queue.first == NULL) {            if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)                != NGX_OK)            {                (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);                return NULL;            }        }        //从任务队列中获取task,并将其从队列中移除        task = tp->queue.first;        tp->queue.first = task->next;         if (tp->queue.first == NULL) {            tp->queue.last = &tp->queue.first;        }         if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {            return NULL;        }        ......        //task的处理函数        task->handler(task->ctx, tp->log);        .....         ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);         //将经过预处理的任务添加到done队列中等待调用event的回调函数继续处理        *ngx_thread_pool_done.last = task;        ngx_thread_pool_done.last = &task->next;                //防止编译器优化,保证解锁操作是在上述语句执行完毕后再去执行的        ngx_memory_barrier();         ngx_unlock(&ngx_thread_pool_done_lock);                (void) ngx_notify(ngx_thread_pool_handler);    }} //处理pool_done队列上task中包含的每个event事件static void  ngx_thread_pool_handler(ngx_event_t *ev){    .....    ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);      //获取任务链表的头部    task = ngx_thread_pool_done.first;    ngx_thread_pool_done.first = NULL;    ngx_thread_pool_done.last = &ngx_thread_pool_done.first;     ngx_memory_barrier();     ngx_unlock(&ngx_thread_pool_done_lock);     while (task) {        ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,                       "run completion handler for task #%ui", task->id);        //遍历队列中的所有任务事件        event = &task->event;        task = task->next;         event->complete = 1;        event->active = 0;         //调用event对应的处理函数有针对性的进行处理        event->handler(event);    }}
 
三、thread_pool线程池使用示例
     根据之前所讲到的,nginx中的线程池主要是用于操作文件的IO操作。所以,在nginx中自带的模块ngx_http_file_cache.c文件中看到了线程池的使用。
/*********************** nginx/src/os/unix/ngx_files.c  **********************///file_cache模块的处理函数(涉及到了线程池)static ssize_t  ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c){    .......#if (NGX_THREADS)     if (clcf->aio == NGX_HTTP_AIO_THREADS) {        c->file.thread_task = c->thread_task;        //这里注册的函数在下面语句中的ngx_thread_read函数中被调用        c->file.thread_handler = ngx_http_cache_thread_handler;        c->file.thread_ctx = r;        //根据任务的属性,选择正确的线程池,并初始化task结构体中的各个成员                n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0, r->pool);         c->thread_task = c->file.thread_task;        c->reading = (n == NGX_AGAIN);         return n;    }#endif     return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0);}  //task任务的处理函数static ngx_int_t  ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file){    .......    tp = clcf->thread_pool;    .......        task->event.data = r;    //注册thread_event_handler函数,该函数在处理pool_done队列中event事件时被调用    task->event.handler = ngx_http_cache_thread_event_handler;     //将任务放到线程池的任务队列中    if (ngx_thread_task_post(tp, task) != NGX_OK) {        return NGX_ERROR;    }    ......} /*********************** nginx/src/core/ngx_thread_pool.c  **********************///添加任务到队列中ngx_int_t  ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task){    //如果当前的任务正在处理就退出    if (task->event.active) {        ngx_log_error(NGX_LOG_ALERT, tp->log, 0,                      "task #%ui already active", task->id);        return NGX_ERROR;    }     if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {        return NGX_ERROR;    }        //判断当前线程池等待的任务数量与最大队列长度的关系    if (tp->waiting >= tp->max_queue) {        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);         ngx_log_error(NGX_LOG_ERR, tp->log, 0,                      "thread pool \"%V\" queue overflow: %i tasks waiting",                      &tp->name, tp->waiting);        return NGX_ERROR;    }    //激活任务    task->event.active = 1;     task->id = ngx_thread_pool_task_id++;    task->next = NULL;         //通知阻塞的线程有新事件加入,可以解除阻塞    if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);        return NGX_ERROR;    }     *tp->queue.last = task;    tp->queue.last = &task->next;     tp->waiting++;     (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);     ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,                   "task #%ui added to thread pool \"%V\"",                   task->id, &tp->name);     return NGX_OK;}
 
    上面示例基本展示了nginx目前对线程池的使用方法,采用线程池来处理IO这类慢速操作可以提升worker的主线程的执行效率。当然,用户自己在开发模块时,也可以参照file_cache模块中使用线程池的方法来调用多线程提升程序性能。( 欢迎大家多多批评指正)
 

转载于:https://www.cnblogs.com/sxhlinux/p/6906490.html

你可能感兴趣的文章
数字核心 驱动转型:SAP S/4HANA 数字化转型论坛 - 杭州站 即刻报名
查看>>
关于未来交通,这些大咖在未来论坛上的讨论火花四溅
查看>>
农行总行携手趣链科技上线区块链涉农电商融资产品
查看>>
如何使用高大上的方法调参数
查看>>
手把手教你由TensorFlow上手PyTorch(附代码)
查看>>
PHP设计模式——建造者模式
查看>>
【&amp;#9733;】SPF(Dijkstra)算法完美教程
查看>>
大龄程序员怎样渡过中年危机?很多思考,挺有意思的。
查看>>
C++辨析系列谈 [作者: 郑力群]
查看>>
浅析数据一致性
查看>>
大象的崛起!Hadoop七年发展风雨录
查看>>
libjingle源码解析(3)-【PseudoTcp】建立UDP之上的TCP(1):连接和关闭
查看>>
麦肯锡:全球调研14个行业、160个案例、3000名高管,AI应用到哪一步了?
查看>>
项目经理的10条规则
查看>>
企业安全三步走 惠普重新思考安全战略
查看>>
Servlet常用操作(基础)
查看>>
10个要点为Joomla网站创建完善的SEO优化内容
查看>>
"动静"结合 APT防护需建立整体应对体系
查看>>
云安全到来 手动更新病毒码将成为历史
查看>>
众信金融获“2015最佳绿色贡献奖”
查看>>