Fantacity

Stand Alone Complex

一个基于事件驱动(event-driven)的回显服务器实现

近年来,Node.js非常活跃,为什么会有这么多人用Node.js呢,相比传统的webserver服务模式,node.js的优势在哪里呢?
Node.js是单进程单线程,基于事件驱动(event-driven)的服务器框架,它的性能非常高。本文并不打算讨论Node.js,这里是为了引出事件驱动这一概念。

事件驱动(event-driven)

什么是事件驱动?先来看一个在生活中很常见的例子,去肯德基点餐。

在传统的webserver服务模式中,我们到了柜台前点餐,你告诉服务员你需要汉堡,薯条,可乐,然后等在那里直到服务员把食物准备好给你,在这之前服务员不能接待下一个顾客。如果想要加快接待客人的速度,那么就增加更多的服务员。
当然,我们知道实际上肯德基并不是这样工作的。他们其实就是给予事件驱动方式,这样的效率更高。只要你把要点的食物告诉服务员,付完款之后,你就站在一边,而服务员已经开始接待下一个客户了。在一些餐馆中,他们甚至会给你一个号码,如果你的食物准备好了,就呼叫你的号码去前台取。这里关键的一点就是,你没有阻塞下一个客户的订餐请求。你订的食物做好的事件会导致某个人的某个动作(服务员喊你的订单号码去取食物)。

第一个方式对应到程序中就是使用线程池+阻塞请求的方式实现的,对于每一个连接请求,服务器从先线程池中取出一个线程来处理请求,这个线程在读取完客户端发送过来的数据之前只能一直阻塞着,而不能做别的事情。
而第二种方式对应到程序中就是基于事件驱动+非阻塞的方式实现的,服务器为每一个socket注册一个事件回调函数,并在一个循环内不断观察每一个socket的状态,当这个socket状态改变,就调用相应的回调函数去处理。这里关键的一点是每个socket描述符都是non-blocking的,如果描述服可读(可写)就读取(写入)数据,否则马上返回,所以服务器可以使I/0效率最大化。

另外,在linux下,可以用epoll来轻松实现事件驱动。

epoll

epoll是在linux2.6 的时候引进的一个函数,epoll是linux-only函数,在别的UNIX-like也有类似的函数kqueue。
epoll提供了和poll(2)和poll(2)类似的功能,都可以同时观察多个文件描述符,并返回其中可以执行I/O的文件描述符:

  • select (2)每次最多只能监听FD_SETSIZE数量文件描述符,这个值通常用libc的运行时决定。据我了解select底层是基于数组实现的,所以有这一限制。
  • poll(2)没有文件描述符数量的限制,据我了解其底层是用链表实现的,但是和select(2)一样,它们的时间复杂度都是O(n),这是非常慢的。

epoll没有上述限制,据我了解其底层是通过回调函数的机制实现的,所以时间复杂度是O(1)。具体关于epoll的使用方式以及它和poll 与 select区别下次在另一篇博客详细分析一下,这里不再赘述。

一个基于事件驱动的回显服务器实现

这里我简单实现了一个基于事件驱动的回显服务器,主要参考的是zaver的代码实现的。

首先我们为服务端设置SIGPIPE信号的动作:

/* Install signal handle for SIGPIPE
* when a fd is clised by remote, writing to rhis fd will cause system send
* SIGPIPE to this process, which default action is exit the program*/
struct sigaction sa;
memset((void *)&sa, 0, sizeof(sa));
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
if (sigaction(SIGPIPE, &sa, NULL) < 0)
{
log_err("install signal handler for SIGPIPE failed");
exit(EXIT_FAILURE);
}

关于SIGPIPE信号,我们可以参看UNP书第三版的5.13节:当一进程向某个已收到RST的套接字执行写操作时,内核向该进程发送一个SIGPIPE信号。该信号的默认行为是终止进程,因此进程必须捕获它以免不情愿地被终止。不论该进程是捕获了该信号并从其信号处理函数返回,还是简单地忽略该信号,写操作都将返回EPIPE错误。
我们的程序中设置SIGPIPE信号的动作为忽略该信号,而该信号的默认动作是终止进程。由于我们并不知道客户端会在什么情况下突然断开连接,如果我们向这个已经断开连接中写入数据,那么第一次会收到客户端发送的RST,但是在应用层并不知道这个RST,于是第二次写数据的时候就被莫名其妙终止进程了。
在用webbench测试web服务器性能的时候就遇到过这个问题,server总是莫名其妙挂掉,其实就是因为这个原因。

接着创建了一个服务器监听套接字,并把它设为非阻塞模式:

int listen_fd;
struct sockaddr_in clientaddr;
socklen_t clientlen = 1;
memset((void *)&clientaddr, 0, sizeof(clientaddr));
listen_fd = open_listenfd(8080); /* listen on port 8080 */
rc = make_socket_non_blocking(listen_fd); /* set file descriptor non-blocking */

创建epoll文件描述符,并监听服务器套接字listen_fd:

/* Ceate epoll and listen to epoll*/
int epoll_fd = ts_epoll_create(0);
struct epoll_event event;
ts_message_t *msg = (ts_message_t *)malloc(sizeof(ts_message_t));
ts_init_message_t(msg, listen_fd, epoll_fd);
event.data.ptr = (void *)msg;
event.events = EPOLLIN | EPOLLET;
ts_epoll_add(epoll_fd, listen_fd, &event);

接下来就是关键事件循环,也就是epoll wait loop:

/* epoll_wait loop*/
int i, n, fd;
while (1)
{
n = ts_epoll_wait(epoll_fd, events, MAXEVENTS, 0);
for (i = 0; i < n; ++i)
{
ts_message_t *msg = (ts_message_t *)events[i].data.ptr;
fd = msg->fd;
if (listen_fd == fd)
{
/* We have one or more incoming connections */
int infd;
while (1)
{
infd = accept(listen_fd, (struct sockaddr *)&clientaddr, &clientlen);
if (infd < 0) /* error */
{
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
/* We have processed all incoming connections
* more details: https://banu.com/blog/2/how-to-use-epoll-a-complete-example-in-c */
break;
}
else
{
log_err("accpet");
break;
}
}
rc = make_socket_non_blocking(infd);
check(rc == 0, "make_socket_non_blocking");
log_info("new connection fd %d", infd);
ts_message_t *msg = (ts_message_t *)malloc(sizeof(ts_message_t));
if (msg == NULL)
{
log_err("malloc ts_message_t");
break;
}
ts_init_message_t(msg, infd, epoll_fd);
event.data.ptr = (void *)msg;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
ts_epoll_add(epoll_fd, infd, &event);
} // end of while accpet
}
else
{
/* We have one or more clients' messages to read */
/*
* EPOLLHUP:
* Stream socket peer closed connection, or shut down writing half
* of connection. (This flag is especially useful for writing simple
* code to detect peer shutdown when using Edge Triggered monitoring
* */
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
log_err("epoll error fd: %d", fd);
close(fd);
continue;
}
//log_info("new data from fd %d:", fd);
ts_handle_message(msg);
}
} // end of for
} // end of while (1)

这里要注意的是在处理listen_fd文件描述符的时候,需要多次accept直到到errno == EAGAIN或者errno == EWOULDBLOCK,这样才能把当前所有的connect请求都accept。

epoll的事件提醒有有两种模式,一种叫level triggered(水平触发),另一种叫edge triggerd(边缘触发)。

  • LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理(完)该事件。下次调用epoll_wait时,会再次响应应用程序通知此事件。
  • EG模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该时间。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

举个具体的例子:如果fd上有2kb的数据,应用程序只读取了1kb,ET就不会在下一次epoll_wait的时候返回,读完以后又有心数据才返回。而ET只要有数据没有读完就会一直返回这个fd。所以在这里我们使用ET模式,循环read知道EAGAIN。

在客户的文件描述符可读的时候,我们调用ts_handle_message()来处理数据,该函数的主要作用就是循环读入客户的数据到缓冲区,直到errno == EAGAIN。有一种情况是可能一次不能把客户的所有数据都读完(在这里客户每次发送一行数据,但是服务器并不一定能一次完整接收到一行数据),这种时候就需要缓冲区把数据先暂时存储起来,直到读取到一个\r\n之后,再回显在标准输出上,并写回给客户文件描述符。

一个完整的实现请看这里;