select多路复用
使用select 实现并做简单的封装。虽然使用多线程和多进程都可以实现对多个客户端监听每一个进程/线程都会在send/recv处阻塞等待发送/接收select、poll、epoll可以对文件描述符进行监控当有事件触发时才会去调用。select实现I/O多路复用就是对fd_set位图的操作如果要监控可读操作就声明一个read的位图把已经连接到服务器的clientfd记录下来不如clientfd10就是把位图中第十个值置为1通过select对位图进行监控当有满足read的文件描述符时值不变不满足的清空所以要把连接的客户端存起来避免丢失。所以返回的fd_set就是满足可读的已连接客户端对这些客户端进行操作就行了。这里面存在的问题就是系统默认fd_set大小为1024所以最多连接1020个客户端0-标准输入流、1-标准输出流、2-标准错误流、3-服务端文件描述符当客户端连接过多时会丢失或者出现错误。还有就是fd_set会从用户态拷贝到内核再拷贝出来性能上不是很好。select是一种 I/O 多路复用技术允许程序同时监视多个文件描述符file descriptors等待一个或多个描述符变为就绪状态可读、可写或发生异常I/O多路复用就是使用一个线程管理多个io是否就绪。先了解一下几个概念fd_set是什么fd_set是一个存放文件描述符的数组大小为1024位 结构体实际上就是定义一个 长整型的数组 long int fds_bits[FD_SETSIZE / NFDBITS];1024 int * 数组个数 * 8 位能够储存1024个文件描述符也就是实际能够储存1024个socket的文件描述符。每个进程默认打开3个文件描述符0-标准输入流、1-标准输出流、2-标准错误流/* The fd_set member is required to be an array of longs. */ typedef long int __fd_mask; /* Some versions of linux/posix_types.h define this macros. */ #undef __NFDBITS /* Its easier to assume 8-bit bytes than to get CHAR_BIT. */ #define __NFDBITS (8 * (int) sizeof (__fd_mask)) #define __FD_ELT(d) ((d) / __NFDBITS) #define __FD_MASK(d) ((__fd_mask) (1UL ((d) % __NFDBITS))) /* fd_set for select and pselect. */ typedef struct { /* XPG4.2 requires this member name. Otherwise avoid the name from the global namespace. */ #ifdef __USE_XOPEN __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)-fds_bits) #else __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)-__fds_bits) #endif } fd_set;文件描述符的分配规则寻找最小的未使用的文件描述符所以连接数较少时可以使用select进行实现。文件描述符如何映射到socket好像是进程控制块-进程描述表-文件描述符表-文件对象具体怎么映射的没了解int select(int maxfdp, //最大文件描述符值加1 fd_set *readfds, //指向可读文件描述符集合的指针 可读时接收 fd_set *writefds, //指向可写文件描述符集合的指针 可写时发送 fd_set *exceptfds, //指向异常文件描述符集合的指针 常见带外数据到达TCP紧急数据 struct timeval *timeout); //超时时间结构体指针实现#include iostream #include sys/select.h #include sys/socket.h #include netinet/in.h #include arpa/inet.h #include unistd.h #include cstring #include vector #include algorithm #define PORT 8080 #define BUFFER_SIZE 1024 #define MAX_CLIENTS 10 class TCPServer { private: int server_socket; fd_set readfds; int client_socket[MAX_CLIENTS]; int max_sd; public: TCPServer() { server_socket -1; // 初始化客户端socket数组 for (int i 0; i MAX_CLIENTS; i) { client_socket[i] 0; } } void init() { int opt 1; struct sockaddr_in address; // 创建主socket if ((server_socket socket(AF_INET, SOCK_STREAM, 0)) 0) { perror(socket failed); exit(EXIT_FAILURE); } // 设置socket选项允许地址重用 if (setsockopt(server_socket, SOL_ perror(setsockopt); exit(EXIT_FAILURE); } // 配置地址 address.sin_family AF_INET; address.sin_addr.s_addr INADDR_ANY; address.sin_port htons(PORT); // 绑定socket if (bind(server_socket, (struct sockaddr *)address, sizeof(address)) 0) { perror(bind failed); exit(EXIT_FAILURE); } std::cout Listener on port PORT std::endl; // 开始监听 if (listen(server_socket, MAX_CLIENTS) 0) { perror(listen); exit(EXIT_FAILURE); } } void run() { int activity, new_socket, sd; int max_sd; struct sockaddr_in address; socklen_t addrlen sizeof(address); char buffer[BUFFER_SIZE]; std::cout Waiting for connections... std::endl; while (true) { // 清空文件描述符集合 FD_ZERO(readfds); // 添加主socket到集合 FD_SET(server_socket, readfds); max_sd server_socket; // 添加客户端socket到集合 for (int i 0; i MAX_CLIENTS; i) { sd client_socket[i]; if (sd 0) { FD_SET(sd, readfds); } max_sd max_sd sd ? max_sd : sd; } // 等待活动 struct timeval timeout; timeout.tv_sec 0; // 秒 timeout.tv_usec 500; activity select(max_sd 1, readfds, NULL, NULL, timeout); if (activity 0 errno ! EINTR) { std::cerr select error std::endl; continue; }else if (activity 0) { // 超时处理 continue; } // 如果有新连接 if (FD_ISSET(server_socket, readfds)) { if ((new_socket accept(server_socket, (struct sockaddr *)address, addrlen)) 0) { perror(accept); exit(EXIT_FAILURE); } std::cout New connection, socket fd: new_socket , IP: inet_ntoa(address.sin_addr) , Port: ntohs(address.sin_port) std::endl; // 添加到客户端数组 for (int i 0; i MAX_CLIENTS; i) { if (client_socket[i] 0) { client_socket[i] new_socket; std::cout Adding to list of sockets as i client id: new_sock et std::endl; break; } } } // 检查客户端socket的IO操作 for (int i 0; i MAX_CLIENTS; i) { sd client_socket[i]; memset(buffer,0,BUFFER_SIZE); if (FD_ISSET(sd, readfds)) { // 检查是否断开连接 int valread read(sd, buffer, BUFFER_SIZE); if (valread 0) { // 客户端断开连接 getpeername(sd, (struct sockaddr*)address, addrlen); std::cout Host disconnected, IP: inet_ntoa(address.sin_addr) , Port: ntohs(address.sin_port) std::endl; close(sd); client_socket[i] 0; } else { // 处理接收到的数据 buffer[valread] \0; std::cout Received: buffer std::endl; // 回声给客户端 send(sd, buffer, strlen(buffer), 0); } } } } } ~TCPServer() { close(server_socket); for (int i 0; i MAX_CLIENTS; i) { if (client_socket[i] 0) { std::cout close client : client_socket[i] std::endl; close(client_socket[i]); } } } }; int main() { TCPServer server; server.init(); server.run(); return 0; }poll多路复用poll不再使用位图对文件描述符存储而是通过pollfd进行控制可以声明pollfd的数组所以就没有了1024的限制。poll改进没有最大文件描述符限制poll使用数组可以处理任意数量的文件描述符更高效不需要每次调用都重新设置文件描述符集合更清晰的事件分离每个文件描述符都有独立的事件输入和输出字段poll事件常量事件常量值通常说明对象POLLIN0x001数据可读普通数据events/reventsPOLLPRI0x002紧急数据可读带外数据events/reventsPOLLOUT0x004数据可写不阻塞events/reventsPOLLRDNORM0x040普通数据可读events/reventsPOLLRDBAND0x080优先级带数据可读events/reventsPOLLWRNORM0x100普通数据可写events/reventsPOLLWRBAND0x200优先级带数据可写events/reventsPOLLERR0x008错误情况reventsPOLLHUP0x010已挂起reventsPOLLNVAL0x020无效轮询请求revents#include stdio.h #include stdlib.h #include string.h #include unistd.h #include iostream #include errno.h #include sys/socket.h #include netinet/in.h #include arpa/inet.h #include poll.h #define PORT 8080 #define MAX_CLIENTS 1000 #define BUFFER_SIZE 1024 #define TIMEOUT -1 // 无限等待 class TCPServer{ private: int server_fd; char buffer[BUFFER_SIZE]; struct pollfd fds[MAX_CLIENTS 1]; // 1 给服务器套接字 int nfds; public: TCPServer(){ server_fd -1; nfds 0; // 初始化所有 pollfd 结构 for (int i 0; i MAX_CLIENTS 1; i) { fds[i].fd -1; // 表示未使用 fds[i].events 0; fds[i].revents 0; } }; void init(){ // 创建监听套接字 if ((server_fd socket(AF_INET, SOCK_STREAM, 0)) 0) { perror(socket failed); exit(EXIT_FAILURE); } // 设置 SO_REUSEADDR 选项 int opt 1; if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, opt, sizeof(opt))) { perror(setsockopt failed); exit(EXIT_FAILURE); } struct sockaddr_in address; address.sin_family AF_INET; address.sin_addr.s_addr INADDR_ANY; address.sin_port htons(PORT); // 绑定地址 if (bind(server_fd, (struct sockaddr *)address, sizeof(address)) 0) { perror(bind failed); exit(EXIT_FAILURE); } // 开始监听 listen(server_fd, 10) 等待连接的最大个数 if (listen(server_fd, 10) 0) { perror(listen failed); exit(EXIT_FAILURE); } printf(Server listening on port %d\n, PORT); // 添加服务器套接字到 pollfd 数组 fds[0].fd server_fd; fds[0].events POLLIN; // 监视可读事件新连接 nfds 1; // 当前监视的文件描述符数量 }; void run(){ int current_size 0; while (1) { // 调用 poll等待事件发生 int ret poll(fds, nfds, TIMEOUT); if (ret 0) { perror(poll failed); break; } else if (ret 0) { // 超时本例中不会发生因为TIMEOUT-1 continue; } current_size nfds; // 保存当前大小因为 nfds 可能在循环中改变 // 检查所有文件描述符 for (int i 0; i current_size; i) { if (fds[i].fd 0) continue; // 跳过未使用的文件描述符 // 检查是否有事件发生 if (fds[i].revents 0) continue; // 检查是否是错误事件 if (fds[i].revents (POLLERR | POLLHUP | POLLNVAL)) { printf(Error on fd %d, closing connection\n, fds[i].fd); close(fds[i].fd); fds[i].fd -1; fds[i].revents 0; continue; } // 如果是服务器套接字处理新连接 if (fds[i].fd server_fd) { if (fds[i].revents POLLIN) handle_client_connect(); } // 处理客户端套接字 else { handle_client_event(i); } } } }; void handle_client_connect(){ // 接受新连接 struct sockaddr_in address; socklen_t addrlen sizeof(address); int new_socket accept(server_fd, (struct sockaddr *)address, addrlen); if (new_socket 0) { perror(accept failed); return ; } printf(New connection, socket fd is %d, IP: %s, port: %d\n,new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port)); // 查找空位添加新客户端 int added 0 ; for (int j 1; j MAX_CLIENTS 1; j) { if (fds[j].fd -1) { fds[j].fd new_socket; fds[j].events POLLIN | POLLRDHUP; // 监视可读和连接关闭 fds[j].revents 0; added 1; nfds j nfds ? j1 : nfds; break; } } if (!added) { printf(Too many clients, rejecting connection\n); close(new_socket); } }; void handle_client_event(int fd_index){ // 检查连接是否关闭 if (fds[fd_index].revents POLLRDHUP) { printf(Client %d disconnected\n, fds[fd_index].fd); close(fds[fd_index].fd); fds[fd_index].fd -1; fds[fd_index].revents 0; return; } // 检查是否有数据可读 if (fds[fd_index].revents POLLIN) { memset(buffer,0,BUFFER_SIZE); int valread read(fds[fd_index].fd, buffer, BUFFER_SIZE); if (valread 0) { // 客户端正常关闭连接 printf(Client %d closed connection\n, fds[fd_index].fd); close(fds[fd_index].fd); fds[fd_index].fd -1; } else if (valread 0) { // 读取错误 perror(read failed); close(fds[fd_index].fd); fds[fd_index].fd -1; } else { // 回显数据 buffer[valread] \0; std::cout Received from client : fds[fd_index].fd buffer std: :endl; send(fds[fd_index].fd,buffer,strlen(buffer),0);

相关新闻