IO 多路复用 select

I/O 多路复用介绍

IO 多路复用是指,将多个 fd 加入到 IO 多路复用函数(select/poll/epol)中,等待任何一个变为可读/可写,如果有就绪的 fd 可操作,操作系统会唤醒等待该 fd 的进程,进程从阻塞状态返回,开始进行业务处理。

Select

select 函数

#include <sys/select.h> 
#include <sys/time . h> 
 
int select(int maxfd, fd_set * readset, fd_set * writeset, fd_set * exceptset, const struct 
timeval * timeout); 
// 成功时返回大于 0 的值,失败时返回 -1

设置文件描述符

select 函数可以监视多个文件描述符,按照(可读、可写、异常)三种监视项,使用 fd_set数组执行此项操作

fd_set数组是存有 0 和 1 的位数组,如果设置为 1 则表示该位是监视对象

fd_set 变量中注册或更改值的操作都由下列宏完成:

FD_ZERO(fd_set * fdset) : 将 fd_set 变量的所有位初始 FD_SET(int fd, fd_set* fdset): 在参数 fdset 指向的变量中注册文件描述符 fd 的信息 FD_CLR(int fd, fd_set * fdset): 从参数 fdset 指向的变量中清除文件描述符 fd 信息 FD_ISSET(int fd, fd_set *fdset): 若参数 fdset 指向的 变量中包含文件描述符fd 信息,则返回 true

调用 select 函数后查看结果

select 函数调用完成后,向其传递的 fd_Set 变量中将发生变化。没有发生变化的文件描述符,原来为 1 的所有位均变为 0 因 select 会重置 fd_set 数组,所以每次调用 select 前,需要拷贝一份副本给 select 重新传入

select 有如下的缺点:

go 语言版:使用 select 实现 echo 服务

服务端

echo_server.go

package main

import (
	"fmt"
	"log"
	"os"
	"strconv"
	"syscall"

	"golang.org/x/sys/unix"
)

var (
	MAXMSGSIZE    = 1024
	LISTENBACKLOG = 15
	ADDR          = [4]byte{127, 0, 0, 1}
)

func main() {
	if len(os.Args) != 2 {
		fmt.Printf("usage: %s <port>\n", os.Args[0])
		os.Exit(1)
	}

	serverFD, err := unix.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
	checkError("Socket", err)
	fmt.Printf("serverFD: %d\n", serverFD)

	// 设置 SO_REUSEADDR 选项,套接字可以在 TimeWait 的状态下的端口号被新套接字使用
	err = unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
	checkError("SetsockoptInt", err)

	port, _ := strconv.Atoi(os.Args[1])
	serverAddr := &unix.SockaddrInet4{
		Port: port,
		Addr: ADDR,
	}

	// bind
	err = unix.Bind(serverFD, serverAddr)
	checkError("Bind", err)

	fmt.Printf("Server: Bound to addr: %d, port: %d\n", serverAddr.Addr, serverAddr.Port)

	// listen
	err = unix.Listen(serverFD, LISTENBACKLOG)
	checkError("Listen", err)

	timeout := &unix.Timeval{Sec: 5, Usec: 5000}
	reads := unix.FdSet{}
	var fdMax int
	reads.Zero()
	reads.Set(serverFD)
	fdMax = serverFD
	fmt.Printf("serverFD: %d\n", serverFD)
	for {
		// 因 select 会重置 FdSet 数组,所以每次调用 select 前,需要拷贝一份副本给 select 重新使用
		copyReads := reads
		fdNum, err := unix.Select(fdMax+1, &copyReads, nil, nil, timeout)
		if fdNum == -1 {
			fmt.Println("fdNum: -1\n")
			break
		} else if fdNum == 0 {
			fmt.Println("fdNum: 0\n")
			continue
		}
		if err != nil {
			log.Fatal("Select: ", err)
		}

		for fd := 0; fd < fdMax+1; fd++ {
			if copyReads.IsSet(fd) {
				if fd == serverFD {
					acceptedFD, _, err := unix.Accept(serverFD)
					checkError("Accept", err)
					fmt.Printf("acceptedFD:%d\n", acceptedFD)
					reads.Set(acceptedFD)
					if acceptedFD > fdMax {
						fdMax = acceptedFD
					}
				} else {
					buff := make([]byte, MAXMSGSIZE)
					sizeMsg, clientAddr, err := unix.Recvfrom(fd, buff, 0)
					if err != nil {
						fmt.Println("read error:", err)
						reads.Clear(fd)
						unix.Close(fd)
						continue
					}
					response := buff[:sizeMsg]
					if sizeMsg > 0 {
						fmt.Printf("Read: %s", string(response))
						err := unix.Sendmsg(fd, response, nil, clientAddr, unix.MSG_DONTWAIT) // 非阻塞 IO
						checkError("Write"+strconv.Itoa(fd), err)
						fmt.Printf("write: %s", response)
					} else {
						fmt.Println(reads)
						reads.Clear(fd)
						unix.Close(fd)
						fmt.Printf("closed client: %d \n", fd)
					}
				}
			}
		}
	}
	unix.Close(serverFD)
	os.Exit(0)
}

func checkError(name string, err error) {
	if err != nil {
		fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
		os.Exit(1)
	}
}

客户端

echo_client.go

package main

import (
	"bufio"
	"fmt"
	"net"
	"os"
)

const (
	BUF_SIZE = 1024
)

var input = make([]byte, BUF_SIZE)

func main() {
	if len(os.Args) != 2 {
		fmt.Fprintf(os.Stderr, "usage: %s host:port\n", os.Args[0])
		os.Exit(1)
	}

	tcpAddr, err := net.ResolveTCPAddr("tcp4", os.Args[1])
	checkError("ResolveTCPAddr", err)

	conn, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		checkError("DialTCP", err)
	} else {
		fmt.Println("Connected........... ")
	}
	defer conn.Close()
	buf := make([]byte, BUF_SIZE)
	go writeRoutine(conn, buf)
	readRoutine(conn, buf)
	os.Exit(0)
}

func readRoutine(conn net.Conn, buf []byte) {
	for {
		n, err := conn.Read(buf)
		checkError("readRoutine.read", err)
		if n <= 0 {
			return
		}
		fmt.Printf("Message from server: %s\n", buf[:n])
	}
}

func writeRoutine(conn net.Conn, buf []byte) {
	reader := bufio.NewReader(os.Stdin)
	for {
		n, err := reader.Read(buf)
		checkError("reader.Read", err)
		if n <= 0 {
			continue
		}
		msg := string(buf[:n])
		if msg == "q\n" || msg == "Q\n" {
			break
		}
		n1, err := conn.Write(buf)
		checkError("conn.Write", err)
		if n1 <= 0 {
			continue
		}

	}
}
func checkError(name string, err error) {
	if err != nil {
		fmt.Fprintf(os.Stderr, "%s: Fatal error: %s", name, err.Error())
		os.Exit(1)
	}
}

运行服务端

$ go run echo_server.go 8080
serverFD: 3
Server: Bound to addr: [127 0 0 1], port: 8080
serverFD: 3
acceptedFD:4
Read: Hi
write: Hi

运行客户端

$ go run echo_client.go localhost:8080
Connected...........
Hi
Message from server: Hi

我们可以启动多个客户端,因为 select 支持单一线程同时监听多个文件描述符(I/O 事件)