事件模块主要包含以下文件: ngx_event.c/h 事件核心模块,以及定义所有事件模块的统一接口 ngx_event_accept.c 事件连接处理 ngx_event_posted.c/h 队列事件相关,主要队列事件的添加,删除,处理 ngx_event_timer.c/h 定时器事件相关,定时器事件相关的执行,添加,删除 其他就是不同系统对应模块的事件epoll,poll,select,iocp,kqueue等
nginx事件模块支持了多种不同操作系统的事件机制,例如:devpoll, epoll, iocp, kqueue, poll, select, win32_select等。其中ngx_event_actions_t 结构体,该成员结构实现了不同事件驱动模块的具体方法。封装了IO多路复用模型的统一接口 该结构体定义在文件src/event/ngx_event.h 中:
typedef struct { ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 删除事件,将某个描述符的某个事件从事件驱动机制监控描述符集中删除 */ ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 启动对某个指定事件的监控 */ ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 禁用对某个指定事件的监控 */ ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 将指定连接所关联的描述符添加到事件驱动机制监控中 */ ngx_int_t (*add_conn)(ngx_connection_t *c); /* 将指定连接所关联的描述符从事件驱动机制监控中删除 */ ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); /* 监控事件是否发生变化,仅用在多线程环境中 */ ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait); /* 等待事件的发生,并对事件进行处理 */ ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); /* 初始化事件驱动模块 */ ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); /* 在退出事件驱动模块前调用该函数回收资源 */ void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;事件启动初始化主要在ngx_event.c文件中,在ngx_event_core_module模块定义中,实现了两种方法分别为 ngx_event_module_init 和ngx_event_process_init 方法。
ngx_event_module_init是在ngx_init_cycle初始化各种参数的时候调用的,在Nginx 启动过程中没有使用 fork 出 worker 子进程之前。主要是做一些事件模块的初始化,共享内存创建和初始化,全局变量的初始化。以上这些初始化全局变量和共享内存都是多worker进程共用的
ngx_event_process_init是在fork出worker进程之后调用的,每个worker进程都会调用,函数主要是初始化当前进程用的post事件队列,负载均衡锁(ngx_use_accept_mutex),定时器事件初始化,网络事件在各个事件模块进行的初始化,连接池和连接池对应事件的初始化。
ngx_process_events_and_timers是nginx所有事件执行的主体函数,整体逻辑主要是调用三种事件的处理函数以及多进程惊群现象的处理。 ngx_process_events_and_timers主要在以下四个地方调用: 分别是在ngx_single_process_cycle,ngx_worker_process_cycle,ngx_cache_manager_process_handler,ngx_privileged_agent_process_cycle 单进程、worker进程、cache进程、agent进程,这些进程其实就是在一直循环执行当前这个事件处理函数。
不同的事件对应不同的事件处理函数以下是三类事件的处理函数:
事件类型事件执行主体函数定时器事件ngx_event_expire_timers网络 事件ngx_process_eventspost事件ngx_event_process_posted其中ngx_process_events对应各自事件模块的执行函数,ngx_select_process_events, ngx_epoll_process_events等
void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; /* ngx_timer_resolution 主要用来减少事件触发的频率 */ if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0; } else { timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; #if (NGX_WIN32) /* handle signals from master in case of network inactivity */ if (timer == NGX_TIMER_INFINITE || timer > 500) { timer = 500; } #endif } /* * ngx_use_accept_mutex表示是否需要通过对accept加锁来解决惊群问题。 * 当nginx worker进程数>1时且配置文件中打开accept_mutex时,这个标志置为1 */ if (ngx_use_accept_mutex) { /* * ngx_accept_disabled 变量是负载均衡阈值,表示进程是否超载; * 设置负载均衡阈值为每个进程最大连接数的八分之一减去空闲连接数; * 即当每个进程accept到的活动连接数超过最大连接数的7/8时, * ngx_accept_disabled 大于0,表示该进程处于负载过重; * ngx_accept_disabled = ngx_cycle->connection_n / 8 * - ngx_cycle->free_connection_n; * free_connection_n 初始值和connection_n相等,每次新来一个请求free的会减少,断开释放会++。 * 等于0说明当前进程还剩下八分之一的空闲连接,大于0说明已经小于八分之一再大就要超负载了 */ if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { /* 获取accept锁,多个worker只有一个能获取到,非阻塞的过程 ,拿到了就继续处理当前连接,没拿到就不处理当前连接 * 获取成功的话ngx_accept_mutex_held被置为1。拿到锁,意味着监听句柄被放到本进程的epoll中了,如果没有拿到锁,则监听句柄会被从epoll中取出。 * / if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } /* 拿到锁的话,置flag为NGX_POST_EVENTS,这意味着ngx_process_events函数中,任何事件都将延后处理, * 会把accept事件都放到ngx_posted_accept_events链表中,epollin|epollout事件都放到ngx_posted_events链表中 */ if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS; } else { /* timer 主要是epoll_wait 的等超时时间 */ if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { /* accept_mutex_delay配置可以配,在ngx_event_process_init会初始化 */ timer = ngx_accept_mutex_delay; } } } } delta = ngx_current_msec; /* 处理网络事件 对应各个事件模块的事件处理函数 */ (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); /* 处理 accept延迟事件 */ ngx_event_process_posted(cycle, &ngx_posted_accept_events); if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } /* 处理定时器事件,循环遍历找红黑树最左边的节点时间和当前相比判断,大于0说明已经超时 */ if (delta) { ngx_event_expire_timers(); } /* 处理普通的事件 */ ngx_event_process_posted(cycle, &ngx_posted_events); }Level Triggered (LT) 水平触发 socket接收缓冲区不为空 有数据可读 读事件一直触发 socket发送缓冲区不满 可以继续写入数据 写事件一直触发 符合思维习惯,epoll_wait返回的事件就是socket的状态
Edge Triggered (ET) 边沿触发 socket的接收缓冲区状态变化时触发读事件,即空的接收缓冲区刚接收到数据时触发读事件 (读事件只有开始来的时候,会触发一次,所以一次需要) socket的发送缓冲区状态变化时触发写事件,即满的缓冲区刚空出空间时触发读事件
ET的读事件是 “读缓冲区从空到非空” 状态改变的时候 触发,提示你有数据可以读 写事件是 "写缓冲区从非空到空"状态改变的时候触发, 提示你 缓冲区可以写数据了
LT的读事件是 “只要读缓冲区有数据, 就一直触发读事件" 写事件是 ”只要写缓冲区没有满, 就一直触发写事件
所以一般采用epoll ET模式情况下,读事件的时候要一次性读完缓冲区里面的数据。 nginx监听采用的是LT模式,其他采用的是ET模式。水平模式相比边缘模式来说,频繁调用触发会产生更多的系统调用,开销相比自然就高了
/* 处理已准备就绪的事件 */ static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev, **queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE == INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); /* 调用epoll_wait在规定的timer时间内等待监控的事件准备就绪 */ events = epoll_wait(ep, event_list, (int) nevents, timer); /* 若出错,设置错误编码 */ err = (events == -1) ? ngx_errno : 0; /* * 若没有设置timer_resolution配置项时, * NGX_UPDATE_TIME 标志表示每次调用epoll_wait函数返回后需要更新时间; * 若设置timer_resolution配置项, * 则每隔timer_resolution配置项参数会设置ngx_event_timer_alarm为1,表示需要更新时间; */ if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { /* 更新时间,将时间缓存到一组全局变量中,方便程序高效获取事件 */ ngx_time_update(); } /* 处理epoll_wait的错误 */ if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } /* * 若epoll_wait返回的事件数events为0,则有两种可能: * 1、超时返回,即时间超过timer; * 2、在限定的timer时间内返回,此时表示出错error返回; */ if (events == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } /* 仅在多线程环境下有效 */ ngx_mutex_lock(ngx_posted_events_mutex); /* 遍历由epoll_wait返回的所有已准备就绪的事件,并处理这些事件 */ for (i = 0; i < events; i++) { /* * 获取与事件关联的连接对象; * 连接对象地址的最低位保存的是添加事件时设置的事件过期标志位; */ c = event_list[i].data.ptr; /* 获取事件过期标志位,即连接对象地址的最低位 */ instance = (uintptr_t) c & 1; /* 屏蔽连接对象的最低位,即获取连接对象的真正地址 */ c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); /* 获取读事件 */ rev = c->read; /* * 同一连接的读写事件的instance标志位是相同的; * 若fd描述符为-1,或连接对象读事件的instance标志位不相同,则判为过期事件; */ if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } /* 获取连接对象中已准备就绪的事件类型 */ revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:XD d:%p", c->fd, revents, event_list[i].data.ptr); /* 记录epoll_wait的错误返回状态 */ /* * EPOLLERR表示连接出错;EPOLLHUP表示收到RST报文; * 检测到上面这两种错误时,TCP连接中可能存在未读取的数据; */ if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:XD", c->fd, revents); } #if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:XD", c->fd, revents); } #endif /* * 若连接发生错误且未设置EPOLLIN、EPOLLOUT, * 则将EPOLLIN、EPOLLOUT添加到revents中; * 即在调用读写事件时能够处理连接的错误; */ if ((revents & (EPOLLERR|EPOLLHUP)) && (revents & (EPOLLIN|EPOLLOUT)) == 0) { /* * if the error events were returned without EPOLLIN or EPOLLOUT, * then add these flags to handle the events at least in one * active handler */ revents |= EPOLLIN|EPOLLOUT; } /* 连接有可读事件,且该读事件是active活跃的 */ if ((revents & EPOLLIN) && rev->active) { #if (NGX_HAVE_EPOLLRDHUP) /* EPOLLRDHUP表示连接对端关闭了读取端 */ if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } #endif if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; } else { /* 读事件已准备就绪 */ /* * 这里要区分active与ready: * active是指事件被添加到epoll对象的监控中, * 而ready表示被监控的事件已经准备就绪,即可以对其进程IO处理; */ rev->ready = 1; } /* * NGX_POST_EVENTS表示已准备就绪的事件需要延迟处理, * 根据accept标志位将事件加入到相应的队列中; */ if (flags & NGX_POST_EVENTS) { queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); ngx_locked_post_event(rev, queue); } else { /* 若不延迟处理,则直接调用事件的处理函数 */ rev->handler(rev); } } /* 获取连接的写事件,写事件的处理逻辑过程与读事件类似 */ wev = c->write; /* 连接有可写事件,且该写事件是active活跃的 */ if ((revents & EPOLLOUT) && wev->active) { /* 检查写事件是否过期 */ if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } if (flags & NGX_POST_THREAD_EVENTS) { wev->posted_ready = 1; } else { /* 写事件已准备就绪 */ wev->ready = 1; } /* * NGX_POST_EVENTS表示已准备就绪的事件需要延迟处理, * 根据accept标志位将事件加入到相应的队列中; */ if (flags & NGX_POST_EVENTS) { ngx_locked_post_event(wev, &ngx_posted_events); } else { /* 若不延迟处理,则直接调用事件的处理函数 */ wev->handler(wev); } } } ngx_mutex_unlock(ngx_posted_events_mutex); return NGX_OK; }函数整体逻辑主要是从红黑树中找出超时的事件进行处理,循环从树中找出最左边的节点和当前时间的差,比当前小说明已经超时开始处理
void ngx_event_expire_timers(void) { ngx_event_t *ev; ngx_rbtree_node_t *node, *root, *sentinel; sentinel = ngx_event_timer_rbtree.sentinel; /* 循环检查 */ for ( ;; ) { ngx_mutex_lock(ngx_event_timer_mutex); root = ngx_event_timer_rbtree.root; /* 若定时器红黑树为空,则直接返回,不做任何处理 */ if (root == sentinel) { return; } /* 找出定时器红黑树最左边的节点,即最小的节点,同时也是最有可能超时的事件对象 */ node = ngx_rbtree_min(root, sentinel); /* node->key <= ngx_current_time */ /* 若检查到的当前事件已超时 */ if ((ngx_msec_int_t) (node->key - ngx_current_msec) <= 0) { /* 获取超时的具体事件 */ ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer)); /* 下面是针对多线程 */ #if (NGX_THREADS) if (ngx_threaded && ngx_trylock(ev->lock) == 0) { ...... break; } #endif /* 将已超时事件对象从现有定时器红黑树中移除 */ ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer); ngx_mutex_unlock(ngx_event_timer_mutex); /* 设置事件的在定时器红黑树中的监控标志位 */ ev->timer_set = 0;/* 0表示不受监控 */ /* 多线程环境 */ #if (NGX_THREADS) if (ngx_threaded) { ev->posted_timedout = 1; ngx_post_event(ev, &ngx_posted_events); ngx_unlock(ev->lock); continue; } #endif /* 设置事件的超时标志位 */ ev->timedout = 1;/* 1表示已经超时 */ /* 调用已超时事件的处理函数对该事件进行处理 */ ev->handler(ev); continue; } break; } ngx_mutex_unlock(ngx_event_timer_mutex); }队列事件主要是accept事件队列ngx_posted_accept_events 和 普通事件队列ngx_posted_events
/* 函数主要逻辑是从队列中*/ void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) { ngx_queue_t *q; ngx_event_t *ev; /* 判断队列是否为null */ while (!ngx_queue_empty(posted)) { q = ngx_queue_head(posted); ev = ngx_queue_data(q, ngx_event_t, queue); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); /* 将对当前事件从队列中删除 */ ngx_delete_posted_event(ev); /* 处理当前事件的handler逻辑 */ ev->handler(ev); } }事件连接处理模块可以参考着篇文章:https://www.kancloud.cn/digest/understandingnginx/202603
参考资料: https://blog.csdn.net/russell_tao/article/details/7204260 http://yikun.github.io/2014/03/26/nginxevent/ https://www.kancloud.cn/digest/understandingnginx/202602