Nginx 惊群效应的处理和事件调度循环

前言

工作的过程中常常听到惊群效应这一高端的名词,故怀着好奇心去源码的世界一探究竟,探究问题的所在和 nginx 的解决方案。如果下面的分析有错,希望大佬们多多指出。

Nginx进程模型

Nginx 默认采用多进程工作方式,Nginx启动后,会运行一个master进程和多个worker进程。其中master充当整个进程组与用户的交互接口,同时对进程进行监护,管理worker进程来实现重启服务、平滑升级、更换日志文件、配置文件实时生效等功能。worker用来处理基本的网络事件,worker之间是平等的,他们共同竞争来处理来自客户端的请求。

解决惊群问题

epoll 惊群

epoll 有两种工作方式:LT(水平触发) 和 ET(边缘触发)。LT 即只要有事件就通知,而 ET 则只有状态变化时才会通知。 LT 状态下,只要有通知,所有监听这个 socket 的线程都会被唤醒。 ET 状态下,内核只会通知一次(一个线程),因此无论是 accept()read() 还是 write() 都要循环操作到底层返回 EAGAIN 为止。 但是 ET 也会有竞争问题:线程A的 epoll_wait() 返回后,线程 A 不断的调用 accept() 处理连接请求,当内核的 accept queue 队列中的请求恰好处理完时候,内核会重新将该 socket 置为不可读状态,以便可以重新被触发;此时如果新来了一个连接,那么另外一个线程 B 可能被唤醒,然后执行accept() 操作,不过此时之前的线程 A 还需要重新再执行一次 accept() 以确认 accept queue 已经被处理完了,此时如果线程A成功 accept 的话,线程 B 就被惊醒了(线程 B 没有 accept成功)。 历史上还存在过 accept 惊群,但现在的内核已经解决了这个问题,内核只会唤醒一个进程。

Ngnix 的解决方法

Ngnix 目前有几种方法解决惊群问题。

accept_mutex 锁

如果开启了accept_mutex锁,每个 worker 都会先去抢自旋锁,只有抢占成功了,才把 socket 加入到 epoll 中,accept 请求,然后释放锁。accept_mutex 锁也有负载均衡的作用,接受太多请求的 worker 会根据 ngx_accept_disabled 自动放弃锁争抢的机会。

accept_mutex 效率较低,在锁释放之前都无法给到别的进行进行请求的处理,其他进程只能等待 epoll_wait() 超时唤醒。

SO_REUSEPORT 选项

SO_REUSEPORT 在 Linux Kernel 3.9+ 加入支持,Ngnix 在 1.9.1 中加入了这个选项,每个 worker 都有自己的 socket,这些 socket 都 bind 同一个端口。当新请求到来时,内核根据四元组信息进行负载均衡,相对更加高效。

EPOLLEXCLUSIVE 标识

EPOLLEXCLUSIVE 是 Linux Kernel 4.5+ 新添加的一个 epoll 的标识,Ngnix 在 1.11.3 之后添加了 NGX_EXCLUSIVE_EVENT

EPOLLEXCLUSIVE 标识会保证一个事件发生时候只有一个线程会被唤醒,以避免多侦听下的“惊群”问题。不过任一时候只能有一个工作线程调用 accept,在负载较低的时候可能出现某些进程比较繁忙的情况。

针对避免惊群现象的总结:Nginx 在初始化过程中会调用 ngx_event_module_init, ngx_event_process_init 对 IO 事件处理模块和 worker 进程的事件处理循环进行初始化,以常用的 Epoll 为例,如果使用的是 ReusePort 或者 EpollExclusive 来避免惊群现象,则事先就将对事件的监听注册到系统中。使用 accept_mutex,则在后续的事件循环中通过锁的争抢,仅注册成功获得锁的 worker 进行事件处理来避免惊群现象。上面所述的三种方法都有有优势的方面,大家可以根据实际情况进行选择。

调度原理

// src/core/nginx.c

int ngx_cdecl
main(int argc, char *const *argv)
{
    (...) // 进行所有初始化步骤,包括读取配置文件,初始化共享数据结构
    
    if (ngx_process == NGX_PROCESS_SINGLE) {
        ngx_single_process_cycle(cycle);

    } else {
        ngx_master_process_cycle(cycle); // 启动 Master,进入 Master Cycle
    }

    return 0;
}
// src/os/unix/ngx_process_cycle.c
void
ngx_master_process_cycle(ngx_cycle_t *cycle)
{
    char              *title;
    u_char            *p;
    size_t             size;
    ngx_int_t          i;
    ngx_uint_t         sigio;
    sigset_t           set;
    struct itimerval   itv;
    ngx_uint_t         live;
    ngx_msec_t         delay;
    ngx_core_conf_t   *ccf;

    sigemptyset(&set);
    sigaddset(&set, SIGCHLD);
    sigaddset(&set, SIGALRM);
    sigaddset(&set, SIGIO);
    sigaddset(&set, SIGINT);
    sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));

    if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) { // 将信号集合 set 加入到进程的信号阻塞集合之中去,屏蔽信号
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "sigprocmask() failed");
    }

    sigemptyset(&set); // 清空信号集
    
    (...)
    
    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);  // 获取配置

    ngx_start_worker_processes(cycle, ccf->worker_processes,
                               NGX_PROCESS_RESPAWN); // 根据配置中的 worker process 数量启动 worker
    
    (...)
    
    for ( ;; ) {
        (...)

        sigsuspend(&set); // 挂起 Master

        ngx_time_update(); // 唤醒后更新时间
        
        // 下面是根据信号处理器设置的全局变量进行对应操作
        // 包括但不限于:
        // 关闭worker
        // 重载配置或者更新可执行 bin
        // Terminate, Quit, Reopen, Restart 事件
        
        (...)
    }
}

static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
    ngx_int_t  i;

    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");

    for (i = 0; i < n; i++) {
        // 根据配置的 worker 数量启动
        // 启动后 worker 进入 ngx_worker_process_cycle
        ngx_spawn_process(cycle, ngx_worker_process_cycle,
                          (void *) (intptr_t) i, "worker process", type);
        
        ngx_pass_open_channel(cycle);
    }
}

static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
    
    ngx_int_t worker = (intptr_t) data;

    ngx_process = NGX_PROCESS_WORKER; // 设置进程类型标识
    ngx_worker = worker;

    ngx_worker_process_init(cycle, worker); // worker 进程初始化

    ngx_setproctitle("worker process");

    for ( ;; ) {
        
        (...) // 根据信号处理器处理 Exit 事件
        
        ngx_process_events_and_timers(cycle); // 进入处理请求
        
        (...) // 根据信号处理器处理 Terminate, Quit, Reopen 事件
    }

首先分析使用 accept_mutex 所需的先决步骤,需要进行锁的获取,获取成功的 worker 才会被注册到 IO 事件系统中

// src/event/ngx_event.h
#define ngx_process_events   ngx_event_actions.process_events

// src/event/ngx_event.c

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    // 获取定时器
    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;

#if (NGX_WIN32)

        /* handle signals from master in case of network inactivity */

        if (timer == NGX_TIMER_INFINITE || timer > 500) {
            timer = 500;
        }

#endif
    }

    if (ngx_use_accept_mutex) {  // 使用 accept_mutex 的情况
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            /* 
                尝试获取 accept mutex,只有成功获取锁的进程,才会将 listen 套接字注册到 IO 事件系统中。 
                因此,这就保证了只有一个进程拥有监听套接口,故所有进程阻塞在 epoll_wait 时,才不会惊群现象。 
            */  
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            /* 
                如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。 
                这个标志的作用是将所有产生的事件放入一个队列中,等释放锁后,在慢慢来处理事件。 
                因为,处理请求可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁, 
                导致其他进程无法获取锁,这样 accept 的效率就低了。 
            */  
            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                // 没有获得锁得进程,设置 IO 等待超时时间,再去争抢锁。  
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    (...)
}
// src/event/ngx_event_accept.c

ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");

        if (ngx_accept_mutex_held && ngx_accept_events == 0) {
            return NGX_OK;
        }

        // 注册 accept 事件
        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    // 没有获取到锁,删除先前被注册的事件
    if (ngx_accept_mutex_held) {
        if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
}


ngx_int_t
ngx_enable_accept_events(ngx_cycle_t *cycle)
{
    ngx_uint_t         i;
    ngx_listening_t   *ls;
    ngx_connection_t  *c;

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

        c = ls[i].connection;

        if (c == NULL || c->read->active) {
            continue;
        }

        // 注册事件
        if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}

static ngx_int_t
ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all)
{
    ngx_uint_t         i;
    ngx_listening_t   *ls;
    ngx_connection_t  *c;

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

        c = ls[i].connection;

        if (c == NULL || !c->read->active) {
            continue;
        }

#if (NGX_HAVE_REUSEPORT)

        /*
         * do not disable accept on worker's own sockets
         * when disabling accept events due to accept mutex
         */

        if (ls[i].reuseport && !all) {
            continue;
        }

#endif
        // 删除已注册事件
        if (ngx_del_event(c->read, NGX_READ_EVENT, NGX_DISABLE_EVENT)
            == NGX_ERROR)
        {
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}
// src/core/ngx_shmtx.c

ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    // 通过 CAS 获取锁
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

接下来就是进行事件的等待和处理

// src/event/ngx_event.h
#define ngx_process_events   ngx_event_actions.process_events

// src/event/ngx_event.c

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    // 获取定时器
    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;

#if (NGX_WIN32)

        /* handle signals from master in case of network inactivity */

        if (timer == NGX_TIMER_INFINITE || timer > 500) {
            timer = 500;
        }

#endif
    }

    (...)
    
    // 每次 eventloop 都会清空所有的事件队列
    // ngx_posted_next_events, ngx_posted_accept_events, ngx_posted_events

    // 如果 ngx_posted_next_events 队列中仍有未处理的事件,先行处理
    if (!ngx_queue_empty(&ngx_posted_next_events)) {
        ngx_event_move_posted_next(cycle);
        timer = 0;
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags); // 执行对应 IO 时间模型 process_events 方法
    
    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    /*
        ngx_posted_accept_events是一个事件队列,暂存 epoll 从监听套接口 wait 到的 accept 事件。
        前文提到的 NGX_POST_EVENTS 标志被使用后,会将所有的 accept 事件暂存到这个队列
    */
    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    // 处理完事件后,释放锁
    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    // 处理超时的任务
    ngx_event_expire_timers();

    // 读写事件将会被添加到 ngx_posted_events 队列中
    ngx_event_process_posted(cycle, &ngx_posted_events);
    
}
// src/event/ngx_event_posted.c
// 进行事件获取并调用事件对应 handle
void
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted)
{
    ngx_queue_t  *q;
    ngx_event_t  *ev;

    while (!ngx_queue_empty(posted)) {

        q = ngx_queue_head(posted);
        ev = ngx_queue_data(q, ngx_event_t, queue);

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                      "posted event %p", ev);

        ngx_delete_posted_event(ev);

        ev->handler(ev);
    }
}
// src/event/ngx_event_timer.c
// 处理超时任务
void
ngx_event_expire_timers(void)
{
    sentinel = ngx_event_timer_rbtree.sentinel;
    
    for ( ;; ) {
    
        //1) 获取最近将要过期的红黑树节点
        node = ngx_rbtree_min(root, sentinel);

        if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0){
            //2) 未过期,直接return返回
        }

        //3) 获得过期节点的ngx_event_t结构
        ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));

        //4) 从红黑树中移除该过期事件
        ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);

        //5) 调用定时器所绑定的handler回调函数
        ev->timer_set = 0;          //timer_set标志为置为0
            
        ev->timedout = 1;           // timeout标志为置为1,表示定时器已经超时
        
        ev->handler(ev);

    }
}

小结

Nginx 的高效是许多人有亲身体验的,不难看出,Nginx 内部为了更高效地处理请求,用到了很多复杂的数据结构和架构设计,也仰仗于 Linux 内核的不断发展,也使得 Nginx 效率不断提高。最后想吐槽一下,大佬们真的不爱写注释,作为小透明的我们表示很难受。

参考