IO 多路复用 epoll

Reactor 模式

使用同步 I/O 模型(以 epoll_wait 为例)实现 Reactor 模式的工作流程是:

1) 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件

2)主线程调用 epoll_wait 等待 socket 上有数据可读

3)当 socket 有数据可读时,epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。

4) 睡眠在请求队列上的某个工作线程被唤醒,它从 socket 读取数据,并处理客户请求,然后往 epoll 内核事件表中注册该 socket 上的写就绪事件

5) 主线程调用 epoll_wait 等待 socket 可写

6) 当 socket 可写时,epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列

7) 睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户端请求的结果

Epoll 介绍

epoll 是 Linux 特有的 I/O 复用函数。在实现和使用上和 select、poll 有很大差异。

基于 select I/O 复用技术速度慢的原因:

epoll 函数具有如下优点:

epoll 使用一组函数来完成任务,包括:

epoll_create 函数

调用 epoll_create 函数时创建的文件描述符保存空间称为 “epoll例程” epoll_create 函数创建的资源与套接字相同,也由操作系统管理

#include <sys/epoll.h> 
int epoll_create(int size); 
// 成功时返回 epol1 文件描述符,失败时返回-1

epoll_ctl 函数

#include <sys/epoll.h> 
int epoll_ctl(int epfd, int op, int fd , struct epol1_event * event)j 
// "成功时返回 ,失败时返回 -1 并设置 errno

op 操作类型有如下 3 种:

event 指定监视对象的事件, epoll_event 定义如下:

struct epoll_event 
{ 
	__uint32_t events; /* epoll 事件 */
	epoll_data_t data; /* 用户数据 */
};

events 中可以保存的常量及所指的事件类型。

epoll_wait 函数

epoll 函数中系统调用的主要接口是 epoll_wait 函数。它在一段超时时间内等待一组文件描述符上的事件。

#include <sys/epoll.h> 
int epoll_wait(int epfd, struct epoll_event * events, ïnt maxevents, int timeout)
// 成功时返回发生事件的文件描述符个数,失败时返回  -1并设置 errno

LT(条件触发)与ET(边缘触发)

epoll 对文件描述符的操作有两种模式:条件触发(level trigger)边缘触发(edge trigger)。LT 模式是默认模式,LT 模式与ET模式的区别如下:

go 语言版:实现 I/O 复用的 echo 服务端

服务端: echo_server.go

package main

import (
	"fmt"
	"os"
	"strconv"
	"syscall"

	"golang.org/x/sys/unix"
)

var (
	MAXMSGSIZE    = 1024
	LISTENBACKLOG = 15
	ADDR          = [4]byte{0, 0, 0, 0}
)

func main() {
	if len(os.Args) != 2 {
		fmt.Fprintf(os.Stdout, "usage: %s <port>\n", os.Args[0])
		os.Exit(1)
	}

	serverFD, err := unix.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
	checkError("Socket", err)
	defer unix.Close(serverFD)
	fmt.Fprintf(os.Stdout, "serverFD: %d\n", serverFD)

	// 设置 SO_REUSEADDR 选项,套接字可以在 TimeWait 的状态下的端口号被新套接字使用
	err = unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
	checkError("SetsockoptInt", err)

	port, _ := strconv.Atoi(os.Args[1])
	serverAddr := &unix.SockaddrInet4{
		Port: port,
		Addr: ADDR,
	}

	// bind
	err = unix.Bind(serverFD, serverAddr)
	checkError("Bind", err)

	fmt.Fprintf(os.Stdout, "Server: Bound to addr: %d, port: %d\n", serverAddr.Addr, serverAddr.Port)

	// listen
	err = unix.Listen(serverFD, LISTENBACKLOG)
	checkError("Listen", err)

	efd, err := unix.EpollCreate(100)
	checkError("EpollCreate1", err)
	defer unix.Close(efd)

	ev := unix.EpollEvent{Events: unix.EPOLLIN, Fd: int32(serverFD)}
	err = unix.EpollCtl(efd, unix.EPOLL_CTL_ADD, serverFD, &ev)
	checkError("EpollCtl", err)

	events := make([]unix.EpollEvent, 128)
	fmt.Fprintf(os.Stdout, "serverFD: %d\n", serverFD)
	for {
		evenCnt, err := unix.EpollWait(efd, events, -1)
		if evenCnt == -1 {
			fmt.Fprintf(os.Stdout, "epoll_wait() %s \n", err.Error())
			break
		}
		fmt.Fprintf(os.Stdout, "evenCnt: %d\n", evenCnt)
		for i := 0; i < evenCnt; i++ {
			fd := int(events[i].Fd)
			fmt.Fprintf(os.Stdout, "current fd: %d\n", fd)
			if fd == serverFD {
				acceptedFD, _, err := unix.Accept(serverFD)
				checkError("Accept", err)
				epollAdd(efd, acceptedFD)
				fmt.Fprintf(os.Stdout, "connected client: %d \n", acceptedFD)
			} else {
				buff := make([]byte, MAXMSGSIZE)
				sizeMsg, clientAddr, err := unix.Recvfrom(fd, buff, 0)
				if err != nil {
					fmt.Println("read error:", err)
					epollDel(efd, fd)
					continue
				}
				response := buff[:sizeMsg]
				if sizeMsg > 0 {
					fmt.Fprintf(os.Stdout, "Read: %s\n", string(response))
					err := unix.Sendmsg(fd, response, nil, clientAddr, unix.MSG_DONTWAIT) // 非阻塞 IO
					checkError("Write"+strconv.Itoa(fd), err)
					fmt.Fprintf(os.Stdout, "write: %s\n", response)
				} else {
					epollDel(efd, fd)
				}
			}
		}
	}
	os.Exit(0)
}

func checkError(name string, err error) {
	if err != nil {
		fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
		os.Exit(1)
	}
}

func epollAdd(efd int, fd int) {
	ev := unix.EpollEvent{Events: unix.EPOLLIN | unix.EPOLLET, Fd: int32(fd)}
	err := unix.EpollCtl(efd, unix.EPOLL_CTL_ADD, fd, &ev)
	if err != nil {
		fmt.Fprintf(os.Stderr, "epollAdd error:%s \n", err.Error())
	}

}

func epollDel(efd, fd int) {
	err := unix.EpollCtl(efd, unix.EPOLL_CTL_DEL, fd, nil)
	if err != nil {
		fmt.Fprintf(os.Stderr, "epollDel error:%s \n", err.Error())
	}
	unix.Close(fd)
	fmt.Fprintf(os.Stdout, "closed client: %d \n", fd)
}

客户端: echo_client.go

package main

import (
	"bufio"
	"fmt"
	"net"
	"os"
)

const (
	BUF_SIZE = 1024
)

var input = make([]byte, BUF_SIZE)

func main() {
	if len(os.Args) != 2 {
		fmt.Fprintf(os.Stderr, "usage: %s host:port\n", os.Args[0])
		os.Exit(1)
	}

	tcpAddr, err := net.ResolveTCPAddr("tcp4", os.Args[1])
	checkError("ResolveTCPAddr", err)

	conn, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		checkError("DialTCP", err)
	} else {
		fmt.Println("Connected........... ")
	}
	defer conn.Close()
	buf := make([]byte, BUF_SIZE)
	go writeRoutine(conn, buf)
	readRoutine(conn, buf)
	os.Exit(0)
}

func readRoutine(conn net.Conn, buf []byte) {
	for {
		n, err := conn.Read(buf)
		checkError("readRoutine.read", err)
		if n <= 0 {
			return
		}
		fmt.Printf("Message from server: %s\n", buf[:n])
	}
}

func writeRoutine(conn net.Conn, buf []byte) {
	reader := bufio.NewReader(os.Stdin)
	for {
		n, err := reader.Read(buf)
		checkError("reader.Read", err)
		if n <= 0 {
			continue
		}
		msg := string(buf[:n])
		if msg == "q\n" || msg == "Q\n" {
			break
		}
		n1, err := conn.Write(buf)
		checkError("conn.Write", err)
		if n1 <= 0 {
			continue
		}

	}
}
func checkError(name string, err error) {
	if err != nil {
		fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
		os.Exit(1)
	}
}

在非 linux 系统测试

因为 epoll 是属于 linux 内核提供的调用,所以我们不能在非 linux 系统中直接运行,需要借助 docker

Dockerfile 文件

FROM golang:1.17.5 as BUILD

# 启用go module
ENV GO111MODULE=on \
    GOPROXY=https://goproxy.cn,direct

# /go/app
WORKDIR app 
COPY . .
RUN go get -d -v ./...

builder docker 镜像

docker build -t mygoapp .

运行服务端

 docker run --rm -it -p 8080:8080  -v $(pwd):/go/app mygoapp /bin/bash -c 'go run echo_server.go 8080'

运行客户端

go run echo_client.go localhost:8080

客户端不需要 linux,直接可以运行,可以多开几个进行测试