IO 多路复用 select
05 Nov 2021I/O 多路复用介绍
IO 多路复用是指,将多个 fd 加入到 IO 多路复用函数(select/poll/epol)中,等待任何一个变为可读/可写,如果有就绪的 fd 可操作,操作系统会唤醒等待该 fd 的进程,进程从阻塞状态返回,开始进行业务处理。
Select
select 函数
#include <sys/select.h>
#include <sys/time . h>
•
int select(int maxfd, fd_set * readset, fd_set * writeset, fd_set * exceptset, const struct
timeval * timeout);
// 成功时返回大于 0 的值,失败时返回 -1
maxfd
监视对象文件描述符数量。readset
将所有关注”是否存在待读取数据”的文件描述符 册到fdjet型变量,并传递其地址值writeset
将所有关注”是否可传输无阻塞数据”的文件描述符注册到 fd_set 型变量,并传递其地址值。exceptset
将所有关注”是否发生异常”的文件描述符注册到 fd_set 型变量,并传递其地址值timeout
调用select 函数后,为防止陷入无限阻塞的状态,传递超时( time-out )信息。- 返回值, 发生错误时返回 -1,超时返回时返回 0。因发生关注的事件返回时,返回大于 0 的值,该值是发生事件的文件描述符数。
设置文件描述符
select 函数可以监视多个文件描述符,按照(可读、可写、异常)三种监视项,使用 fd_set
数组执行此项操作
fd_set
数组是存有 0 和 1 的位数组,如果设置为 1 则表示该位是监视对象
在 fd_set
变量中注册或更改值的操作都由下列宏完成:
FD_ZERO(fd_set * fdset)
: 将 fd_set
变量的所有位初始
FD_SET(int fd, fd_set* fdset)
: 在参数 fdset 指向的变量中注册文件描述符 fd 的信息
FD_CLR(int fd, fd_set * fdset)
: 从参数 fdset 指向的变量中清除文件描述符 fd 信息
FD_ISSET(int fd, fd_set *fdset)
: 若参数 fdset 指向的 变量中包含文件描述符fd 信息,则返回 true
调用 select 函数后查看结果
select 函数调用完成后,向其传递的 fd_Set
变量中将发生变化。没有发生变化的文件描述符,原来为 1 的所有位均变为 0
因 select 会重置 fd_set
数组,所以每次调用 select
前,需要拷贝一份副本给 select
重新传入
select 有如下的缺点:
- 最大并发数限制, select 是不断轮询去监听的 socket,socket 个数有限制,一般为 1024 个,使用 32 个整数的 32 位,即 32*32=1024 来标识 fd
- 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
- 性能衰减严重:每次调用 select 都需要在内核遍历扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降
go 语言版:使用 select
实现 echo 服务
服务端
echo_server.go
package main
import (
"fmt"
"log"
"os"
"strconv"
"syscall"
"golang.org/x/sys/unix"
)
var (
MAXMSGSIZE = 1024
LISTENBACKLOG = 15
ADDR = [4]byte{127, 0, 0, 1}
)
func main() {
if len(os.Args) != 2 {
fmt.Printf("usage: %s <port>\n", os.Args[0])
os.Exit(1)
}
serverFD, err := unix.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
checkError("Socket", err)
fmt.Printf("serverFD: %d\n", serverFD)
// 设置 SO_REUSEADDR 选项,套接字可以在 TimeWait 的状态下的端口号被新套接字使用
err = unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
checkError("SetsockoptInt", err)
port, _ := strconv.Atoi(os.Args[1])
serverAddr := &unix.SockaddrInet4{
Port: port,
Addr: ADDR,
}
// bind
err = unix.Bind(serverFD, serverAddr)
checkError("Bind", err)
fmt.Printf("Server: Bound to addr: %d, port: %d\n", serverAddr.Addr, serverAddr.Port)
// listen
err = unix.Listen(serverFD, LISTENBACKLOG)
checkError("Listen", err)
timeout := &unix.Timeval{Sec: 5, Usec: 5000}
reads := unix.FdSet{}
var fdMax int
reads.Zero()
reads.Set(serverFD)
fdMax = serverFD
fmt.Printf("serverFD: %d\n", serverFD)
for {
// 因 select 会重置 FdSet 数组,所以每次调用 select 前,需要拷贝一份副本给 select 重新使用
copyReads := reads
fdNum, err := unix.Select(fdMax+1, ©Reads, nil, nil, timeout)
if fdNum == -1 {
fmt.Println("fdNum: -1\n")
break
} else if fdNum == 0 {
fmt.Println("fdNum: 0\n")
continue
}
if err != nil {
log.Fatal("Select: ", err)
}
for fd := 0; fd < fdMax+1; fd++ {
if copyReads.IsSet(fd) {
if fd == serverFD {
acceptedFD, _, err := unix.Accept(serverFD)
checkError("Accept", err)
fmt.Printf("acceptedFD:%d\n", acceptedFD)
reads.Set(acceptedFD)
if acceptedFD > fdMax {
fdMax = acceptedFD
}
} else {
buff := make([]byte, MAXMSGSIZE)
sizeMsg, clientAddr, err := unix.Recvfrom(fd, buff, 0)
if err != nil {
fmt.Println("read error:", err)
reads.Clear(fd)
unix.Close(fd)
continue
}
response := buff[:sizeMsg]
if sizeMsg > 0 {
fmt.Printf("Read: %s", string(response))
err := unix.Sendmsg(fd, response, nil, clientAddr, unix.MSG_DONTWAIT) // 非阻塞 IO
checkError("Write"+strconv.Itoa(fd), err)
fmt.Printf("write: %s", response)
} else {
fmt.Println(reads)
reads.Clear(fd)
unix.Close(fd)
fmt.Printf("closed client: %d \n", fd)
}
}
}
}
}
unix.Close(serverFD)
os.Exit(0)
}
func checkError(name string, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
os.Exit(1)
}
}
客户端
echo_client.go
package main
import (
"bufio"
"fmt"
"net"
"os"
)
const (
BUF_SIZE = 1024
)
var input = make([]byte, BUF_SIZE)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "usage: %s host:port\n", os.Args[0])
os.Exit(1)
}
tcpAddr, err := net.ResolveTCPAddr("tcp4", os.Args[1])
checkError("ResolveTCPAddr", err)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
checkError("DialTCP", err)
} else {
fmt.Println("Connected........... ")
}
defer conn.Close()
buf := make([]byte, BUF_SIZE)
go writeRoutine(conn, buf)
readRoutine(conn, buf)
os.Exit(0)
}
func readRoutine(conn net.Conn, buf []byte) {
for {
n, err := conn.Read(buf)
checkError("readRoutine.read", err)
if n <= 0 {
return
}
fmt.Printf("Message from server: %s\n", buf[:n])
}
}
func writeRoutine(conn net.Conn, buf []byte) {
reader := bufio.NewReader(os.Stdin)
for {
n, err := reader.Read(buf)
checkError("reader.Read", err)
if n <= 0 {
continue
}
msg := string(buf[:n])
if msg == "q\n" || msg == "Q\n" {
break
}
n1, err := conn.Write(buf)
checkError("conn.Write", err)
if n1 <= 0 {
continue
}
}
}
func checkError(name string, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
os.Exit(1)
}
}
运行服务端
$ go run echo_server.go 8080
serverFD: 3
Server: Bound to addr: [127 0 0 1], port: 8080
serverFD: 3
acceptedFD:4
Read: Hi
write: Hi
运行客户端
$ go run echo_client.go localhost:8080
Connected...........
Hi
Message from server: Hi
我们可以启动多个客户端,因为 select 支持单一线程同时监听多个文件描述符(I/O 事件)