IO 多路复用 epoll
10 Nov 2021- Reactor 模式
- Epoll 介绍
epoll_create
函数epoll_ctl
函数- epoll_wait 函数
- LT(条件触发)与ET(边缘触发)
- go 语言版:实现 I/O 复用的 echo 服务端
Reactor 模式
使用同步 I/O 模型(以 epoll_wait 为例)实现 Reactor 模式的工作流程是:
1) 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件
2)主线程调用 epoll_wait 等待 socket 上有数据可读
3)当 socket 有数据可读时,epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。
4) 睡眠在请求队列上的某个工作线程被唤醒,它从 socket 读取数据,并处理客户请求,然后往 epoll 内核事件表中注册该 socket 上的写就绪事件
5) 主线程调用 epoll_wait 等待 socket 可写
6) 当 socket 可写时,epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列
7) 睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户端请求的结果
Epoll 介绍
epoll 是 Linux 特有的 I/O 复用函数。在实现和使用上和 select、poll 有很大差异。
基于 select I/O 复用技术速度慢的原因:
- 调用 select 后需要循环检查文件描述符
- 每次调用 select 都需要向该函数重新传递监视对象信息
epoll 函数具有如下优点:
- 无需编写以监视状态变化为目的的针对所有文件描述符的循环语句
- 调用对应于 select 函数的 epoll_wait 函数时,无需每次传递监视对象信息
epoll 使用一组函数来完成任务,包括:
epoll_create
创建保存 epoll 文件描述符的空间epoll_ctl
向空间注册并注销文件描述符epoll_wait
与 select 函数类似,等待文件描述符发生变化
epoll_create
函数
调用 epoll_create
函数时创建的文件描述符保存空间称为 “epoll例程”
epoll_create
函数创建的资源与套接字相同,也由操作系统管理
#include <sys/epoll.h>
int epoll_create(int size);
// 成功时返回 epol1 文件描述符,失败时返回-1
size
epoll 实例的大小, 新版本可以忽略 size 参数,内核会根据情况调整 poll 例程的大小
epoll_ctl
函数
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd , struct epol1_event * event)j
// "成功时返回 ,失败时返回 -1 并设置 errno
epfd
用于注册监视对象的 epoll 例程的文件描述符op
用于指定监视对象的添加、删除或更改等操作fd
需要注册的监视对象文件描述符。event
监视对象的事件
op
操作类型有如下 3 种:
EPOLL_CTL_ADD
: 将文件描述符注册到 epoll 例程EPOLL_CTL_DEL
: 从 epoll 例程中删除文件描述符EPOLL_CTL_MOD
: 更改注册的文件描述符的关注事件发生情况
event
指定监视对象的事件, epoll_event
定义如下:
struct epoll_event
{
__uint32_t events; /* epoll 事件 */
epoll_data_t data; /* 用户数据 */
};
events 中可以保存的常量及所指的事件类型。
EPOLLIN
: 需要读取数据的情况EPOLLOUT
: 输出缓冲为空,可以立即发送数据的情况EPOLLPRI
: 收到 OOB 数据的情况EPOLLRDHUP
: 断开连接或半关闭的情况,这在边缘触发方式下非常有用EPOLLERR
: 发生错误的情况EPOLLET
: 以边缘触发的方式得到事件通知EPOLLONESfIOT
: 发生一次事件后,相应文件描述符不再收到事件通知。因此需要向epoll_ctl
函数的第二个参数传递EPOLL_CTL_MOD
,再次设置事件
epoll_wait 函数
epoll 函数中系统调用的主要接口是 epoll_wait
函数。它在一段超时时间内等待一组文件描述符上的事件。
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, ïnt maxevents, int timeout)
// 成功时返回发生事件的文件描述符个数,失败时返回 -1并设置 errno
epfd
表示事件发生监视范围的 epoll 例程的文件描述符events
保存发生事件的文件描述符集合的结构体地址值maxevents
第二个参数中可以保存的最大事件数timeout
1/1000 秒为单位的等待时间,传递 -1 时,一直等待直到发生事件
LT(条件触发)与ET(边缘触发)
epoll 对文件描述符的操作有两种模式:条件触发(level trigger)
和 边缘触发(edge trigger)
。LT 模式是默认模式,LT 模式与ET模式的区别如下:
-
条件触发:当 epoll_wait 检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用 epoll_wait 时,会再次响应应用程序并通知此事件。
-
边缘触发:当 epoll_wait 检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用 epoll_wait 时,不会再次响应应用程序并通知此事件。
go 语言版:实现 I/O 复用的 echo 服务端
服务端: echo_server.go
package main
import (
"fmt"
"os"
"strconv"
"syscall"
"golang.org/x/sys/unix"
)
var (
MAXMSGSIZE = 1024
LISTENBACKLOG = 15
ADDR = [4]byte{0, 0, 0, 0}
)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stdout, "usage: %s <port>\n", os.Args[0])
os.Exit(1)
}
serverFD, err := unix.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
checkError("Socket", err)
defer unix.Close(serverFD)
fmt.Fprintf(os.Stdout, "serverFD: %d\n", serverFD)
// 设置 SO_REUSEADDR 选项,套接字可以在 TimeWait 的状态下的端口号被新套接字使用
err = unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
checkError("SetsockoptInt", err)
port, _ := strconv.Atoi(os.Args[1])
serverAddr := &unix.SockaddrInet4{
Port: port,
Addr: ADDR,
}
// bind
err = unix.Bind(serverFD, serverAddr)
checkError("Bind", err)
fmt.Fprintf(os.Stdout, "Server: Bound to addr: %d, port: %d\n", serverAddr.Addr, serverAddr.Port)
// listen
err = unix.Listen(serverFD, LISTENBACKLOG)
checkError("Listen", err)
efd, err := unix.EpollCreate(100)
checkError("EpollCreate1", err)
defer unix.Close(efd)
ev := unix.EpollEvent{Events: unix.EPOLLIN, Fd: int32(serverFD)}
err = unix.EpollCtl(efd, unix.EPOLL_CTL_ADD, serverFD, &ev)
checkError("EpollCtl", err)
events := make([]unix.EpollEvent, 128)
fmt.Fprintf(os.Stdout, "serverFD: %d\n", serverFD)
for {
evenCnt, err := unix.EpollWait(efd, events, -1)
if evenCnt == -1 {
fmt.Fprintf(os.Stdout, "epoll_wait() %s \n", err.Error())
break
}
fmt.Fprintf(os.Stdout, "evenCnt: %d\n", evenCnt)
for i := 0; i < evenCnt; i++ {
fd := int(events[i].Fd)
fmt.Fprintf(os.Stdout, "current fd: %d\n", fd)
if fd == serverFD {
acceptedFD, _, err := unix.Accept(serverFD)
checkError("Accept", err)
epollAdd(efd, acceptedFD)
fmt.Fprintf(os.Stdout, "connected client: %d \n", acceptedFD)
} else {
buff := make([]byte, MAXMSGSIZE)
sizeMsg, clientAddr, err := unix.Recvfrom(fd, buff, 0)
if err != nil {
fmt.Println("read error:", err)
epollDel(efd, fd)
continue
}
response := buff[:sizeMsg]
if sizeMsg > 0 {
fmt.Fprintf(os.Stdout, "Read: %s\n", string(response))
err := unix.Sendmsg(fd, response, nil, clientAddr, unix.MSG_DONTWAIT) // 非阻塞 IO
checkError("Write"+strconv.Itoa(fd), err)
fmt.Fprintf(os.Stdout, "write: %s\n", response)
} else {
epollDel(efd, fd)
}
}
}
}
os.Exit(0)
}
func checkError(name string, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
os.Exit(1)
}
}
func epollAdd(efd int, fd int) {
ev := unix.EpollEvent{Events: unix.EPOLLIN | unix.EPOLLET, Fd: int32(fd)}
err := unix.EpollCtl(efd, unix.EPOLL_CTL_ADD, fd, &ev)
if err != nil {
fmt.Fprintf(os.Stderr, "epollAdd error:%s \n", err.Error())
}
}
func epollDel(efd, fd int) {
err := unix.EpollCtl(efd, unix.EPOLL_CTL_DEL, fd, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "epollDel error:%s \n", err.Error())
}
unix.Close(fd)
fmt.Fprintf(os.Stdout, "closed client: %d \n", fd)
}
客户端: echo_client.go
package main
import (
"bufio"
"fmt"
"net"
"os"
)
const (
BUF_SIZE = 1024
)
var input = make([]byte, BUF_SIZE)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "usage: %s host:port\n", os.Args[0])
os.Exit(1)
}
tcpAddr, err := net.ResolveTCPAddr("tcp4", os.Args[1])
checkError("ResolveTCPAddr", err)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
checkError("DialTCP", err)
} else {
fmt.Println("Connected........... ")
}
defer conn.Close()
buf := make([]byte, BUF_SIZE)
go writeRoutine(conn, buf)
readRoutine(conn, buf)
os.Exit(0)
}
func readRoutine(conn net.Conn, buf []byte) {
for {
n, err := conn.Read(buf)
checkError("readRoutine.read", err)
if n <= 0 {
return
}
fmt.Printf("Message from server: %s\n", buf[:n])
}
}
func writeRoutine(conn net.Conn, buf []byte) {
reader := bufio.NewReader(os.Stdin)
for {
n, err := reader.Read(buf)
checkError("reader.Read", err)
if n <= 0 {
continue
}
msg := string(buf[:n])
if msg == "q\n" || msg == "Q\n" {
break
}
n1, err := conn.Write(buf)
checkError("conn.Write", err)
if n1 <= 0 {
continue
}
}
}
func checkError(name string, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
os.Exit(1)
}
}
在非 linux 系统测试
因为 epoll 是属于 linux 内核提供的调用,所以我们不能在非 linux 系统中直接运行,需要借助 docker
Dockerfile 文件
FROM golang:1.17.5 as BUILD
# 启用go module
ENV GO111MODULE=on \
GOPROXY=https://goproxy.cn,direct
# /go/app
WORKDIR app
COPY . .
RUN go get -d -v ./...
builder docker 镜像
docker build -t mygoapp .
运行服务端
docker run --rm -it -p 8080:8080 -v $(pwd):/go/app mygoapp /bin/bash -c 'go run echo_server.go 8080'
运行客户端
go run echo_client.go localhost:8080
客户端不需要 linux,直接可以运行,可以多开几个进行测试