需求:
使用io复用(select) 实现一个服务器与客户端, 用于模拟用户登录,登出,以及广播消息;
下面代码并没有解决粘包烧包 和解包的问题 , 具体解决方案: tcp 粘包 解包 少包 两种解决方式
以下为win32平台, unix平台稍作修改即可运行;
原本想用iocp或者eventselect 实现更为简单,但人说非得要用select,好把.
下面代码的发送消息和接受消息都使用了结构, 分2个部分一个消息头一个消息体
用于登录登出,用户加入的头文件,定义消息结构的:
trans.h
enum CMD{ CMD_LOGIN, //登录 由客户端发送给服务端 CMD_LOGOUT, //登出 CMD_LOGIN_RESULT,//登录结果,由服务端发送给客户端 CMD_LOGOUT_RESULT,//登出结果 CMD_USER_JOIN, //由服务端群发给客户端 CMD_ERROR }; //消息头 typedef struct _DataHeader{ short dataLen; //用于定义消息体长度 short cmd; //对应上面的命令 } DataHeader, *LPDataHeader; //登录 typedef struct _Login { DataHeader header; //消息头 char uname[32]; //用户名 char passwd[32]; //密码 } Login, *LPLogin; //登录结果 typedef struct _LoginResult{ DataHeader header; short result; //一个示意 }LoginResult, *LPLoginResult; //登出 typedef struct _Logout{ DataHeader header; char uname[32]; //一个示意 }Logout, *LPLogout; //登出结果 typedef struct _LogoutResult{ DataHeader header; short result; //示意 }LogoutResult, *LPLogoutResult; //群发命令 typedef struct _UserJoin{ DataHeader header; int sock; //占位符,没鸟用 }UserJoin, *LPUserJoin;
用于打印错误消息的, 好像没用到,忘了
utils.h
#include <stdlib.h> #include <TCHAR.h> #include <stdio.h> #include <winsock2.h> #include <Windows.h> #include <Ws2ipdef.h> #include <locale.h> #pragma comment(lib, "ws2_32.lib") const SIZE_T ERR_MSG_SIZE = 1 << 13; const unsigned short PORT = 9988; const int BACKLOG = 20; void print_error(DWORD err){ //使用当前平台的字符集 _tsetlocale(LC_ALL, L""); //创建一块内存一直存放错误信息 static HANDLE g_heap = HeapCreate(0, ERR_MSG_SIZE, 0); static TCHAR *buf = (TCHAR*)HeapAlloc(g_heap, 0, ERR_MSG_SIZE); //使用当前平台的语言 DWORD syslocale = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT); DWORD ret = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, err, syslocale, buf, ERR_MSG_SIZE, NULL); if (!ret){ //如果上面没找到错误,去网络错误中查找 static HMODULE hDll = LoadLibraryEx(TEXT("netmsg.dll"), NULL, DONT_RESOLVE_DLL_REFERENCES); if (hDll){ //如果在dll中查找,FORMAT_MESSAGE_FROM_HMODULE 添加上去, 第2个参数填写句柄 ret = FormatMessage(FORMAT_MESSAGE_FROM_HMODULE | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, hDll, err, syslocale, buf, ERR_MSG_SIZE, NULL); } } if (ret && buf){ buf[ret] = 0; _tprintf(TEXT("buf:%s\n"), buf); } else{ _tprintf(TEXT("unknow error : %ld\n"), err); } }
正式代码:
serv.cpp
#include "stdafx.h" #include "../trans.h" #define _UNICODE #define UNICODE #include <WinSock2.h> #include <Windows.h> #pragma comment(lib, "ws2_32.lib") #define BUFSIZE 1024 #define PORT 9988 #define BACKLOG 10 //没用到 int setNonBlockMode(SOCKET sock, u_long bEnable) { return ioctlsocket(sock, FIONBIO, &bEnable); } //处理每个客户端传送的数据,maxi 最大索引,cliens 存放socket, readset 有响应的socket void process_clients(int maxi, SOCKET * clients, FD_SET *readset, FD_SET *allreadset, int *nready); //广播消息 , sock_arr_index 不想给这个socket发送数据的索引 void boardcast_msg(int cmd, int * sock_arr_index, int maxi = -1); //存放socket SOCKET clients[FD_SETSIZE] = {}; //当退出时, 关闭所有客户端 void when_exit(){ for (int i = 0; i < FD_SETSIZE; ++i){ if (clients[i] != INVALID_SOCKET) closesocket(clients[i]); } MessageBox(NULL, TEXT("退出"), TEXT("退出了呀"), MB_OK); } int _tmain(int argc, _TCHAR* argv[]) { //注册函数 atexit(when_exit); WSADATA wsadata; //初始化 if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0){ printf("wsastartup failed\n"); return 0; } //下面的代码省略了各种判断, 自行加入即可 SOCKET hListenSock = socket(AF_INET, SOCK_STREAM, 0); SOCKADDR_IN serv_addr = {}; SOCKADDR_IN client_addr = {}; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(PORT); int serv_sock_len = sizeof(serv_addr) , client_sock_len = sizeof(client_addr); bind(hListenSock, (SOCKADDR*)&serv_addr, serv_sock_len); listen(hListenSock, BACKLOG); //就用了一个 allreadset ,可把其他的 set 删除 FD_SET readset , allreadset ,writeset,allwriteset,expset,allexpset; //清空操作 FD_ZERO(&allreadset); FD_ZERO(&allwriteset); FD_ZERO(&allexpset); //把监听sock加入 FD_SET(hListenSock, &allreadset); //这2行没用 FD_SET(hListenSock, &allwriteset); FD_SET(hListenSock, &allexpset); // nready 是select返回值, maxfd 只为unix做兼容, maxi 是当前clients的最大索引 int nready = 0 ,maxfd = (int)hListenSock, maxi = 0; //一开始全部初始化 -1 ,既无效socket; for (int i = 0; i < FD_SETSIZE; ++i) clients[i] = -1; //超时时间, 仅仅用于查看输出情况, 可删除 struct timeval tval = { 3, 0 }; while (1){ //重置 readset , readset 是值-结果,既每次select都会进行修改,所以要重置 readset = allreadset; puts("before select"); //第一个参数在win32下没用,兼容unix, 最后一个参数可加可不加 nready = select(maxfd + 1, &readset, NULL, NULL, &tval); printf("return from select , nready : %d\n", nready); //如果出错 if (nready == SOCKET_ERROR){ printf("select error :%ld\n", WSAGetLastError()); continue; } // 如果超时 else if (nready == 0){ puts("Timeout , u can do something else!!!"); continue; } //新连接,客户端进来了一个 if (FD_ISSET(hListenSock, &readset)) { SOCKET hClientSock = accept(hListenSock, (SOCKADDR*)&client_addr, &client_sock_len); //这里应该要判断sock是否合法,偷懒只是打印一下 printf("after accept:%d\n", hClientSock); int i = 0; //从clients中找一个位置 放socket for (i = 0; i < FD_SETSIZE && -1!=clients[i]; ++i); //如果clients满了 if (FD_SETSIZE == i) { puts("full clients"); closesocket(hClientSock); if (--nready == 0) continue; } else { int clientSockNo = (int)hClientSock;//这步在win32下没用,win32中无视maxfd if (clientSockNo > maxfd) //这步在win32下没用,win32中无视maxfd maxfd = clientSockNo;//这步在win32下没用,win32中无视maxfd //maxi 仅仅减少clients的循环次数 if (i > maxi) maxi = i; //把socket 加入set中 FD_SET(hClientSock, &allreadset); //放入数组 clients[i] = hClientSock; printf("a client comming! client sock no :%d , index:%d\n", clientSockNo, i); printf("allreadset count:%d\n", allreadset.fd_count); //仅仅减少无效的调用 --nready; //把socket所在的索引放入新的数组中, 来广播消息,除了此sock,其他sock都将收到新用 //户进入的消息 int sock_arr[] = {i}; //广播消息 boardcast_msg(CMD_USER_JOIN,sock_arr,maxi); if (nready == 0) continue; } } //处理其他的客户端 process_clients(maxi, clients, &readset, &allreadset, &nready); } //以下永远不会执行 closesocket(hListenSock); WSACleanup(); return 0; } /* sock_arr_index 是不给这些索引对应的socket发送消息 */ void boardcast_msg(int cmd, int * sock_arr_index,int maxi) { puts(" **** boardcast msg **** "); unsigned int num = (maxi == -1) ? FD_SETSIZE : maxi; int arrlen = sizeof(sock_arr_index) / sizeof(int*); SOCKET tmp_sock[FD_SETSIZE] = {}; switch (cmd){ //目前定义的就这一个命令 case CMD_USER_JOIN: { UserJoin userjoin; userjoin.header.cmd = CMD_USER_JOIN; userjoin.header.dataLen = sizeof(UserJoin)-sizeof(DataHeader); int sockindex = 0; if (sock_arr_index) { //先把不发送消息的socket从clients中取出来 for (int i = 0; i < arrlen; ++i) { sockindex = sock_arr_index[i]; tmp_sock[i] = clients[sockindex]; clients[sockindex] = -1; } //此时clients中所有其他的socket都应该发送消息 for (int i = 0; i <=num; ++i) { if (clients[i] != -1){ printf("send:%d\n", clients[i]); send(clients[i], (const char*)&userjoin, sizeof(userjoin), 0); } } //最后还原 for (int i = 0; i < arrlen; ++i) { clients[sock_arr_index[i]] = tmp_sock[i]; } } else { //如果没有不想发送的客户, 则都发送 for (int i = 0; i < num;++i) if (clients[i] != -1) send(clients[i], (const char*)&userjoin, sizeof(userjoin), 0); } } } } //处理每个客户端 /* 需要注意的, 下面的代码中,所有的recv分为2个部分. 1. 每个预定义的结构,都有一个DataHeader 作为消息头. 2. 剩余的部分作为消息体, DataHeader中的 cmd 对应CMD枚举,datalen对应 消息体的长度; */ void process_clients(int maxi, SOCKET * clients,FD_SET *readset,FD_SET *allreadset,int *nready ) { puts("\tprocess_clients!"); int len = 0, goon = 1; //这里用一个缓冲区来接受 static char buffer[BUFSIZE] = {}; int sockno = 0; for (int i = 0; i <= maxi; ++i) { //非法socket或不存在的就跳过 if (clients[i] < 0) continue; if (FD_ISSET(clients[i], readset)) { puts("\tprepare recv from client!"); len = 0; //一般用TCP写的,都需要用while来接受数据,毕竟是流,你不知道数据到底接受了多少 //但下面其他的代码全部省略了,就留了这一个作为示例 while (1){ len += recv(clients[i], buffer + len, sizeof(DataHeader), 0); //客户端断开 if (len <= 0){ goon = 0; sockno = (int)clients[i]; closesocket(clients[i]); FD_CLR(clients[i], allreadset); clients[i] = -1; printf("\tsocket:%d peer close !\n", sockno); break; } //直到接受满才停止 if (len >= sizeof(DataHeader)) break; } if (!goon){ puts("\tbyebyte"); break; } //先把消息头获取,从消息头中能获取客户端发送的命令以及消息体长度 DataHeader * header = (DataHeader *)buffer; printf("\tserv cmd:%d , len:%d\n", header->cmd, header->dataLen); switch (header->cmd) { //如果是登录 case CMD_LOGIN: { //这里应该也用while来接受,但我偷懒了 len = recv(clients[i], buffer + sizeof(DataHeader), header->dataLen, 0); //接受完后打印一下 Login * pLogin = (Login*)buffer; printf("\tLogin : %s, %s , datalen:%d, cmd:%d\n", pLogin->uname, pLogin->passwd, pLogin->header.dataLen, pLogin->header.cmd); //然后发送给客户端一个结果 LoginResult lr = {}; lr.result = 1; //命令 lr.header.cmd = CMD_LOGIN_RESULT; //消息体长度 lr.header.dataLen = sizeof(LoginResult)-sizeof(DataHeader); send(clients[i], (char*)&lr, sizeof(lr), 0); break; } case CMD_LOGOUT: //登出 与 登录类似 { len = recv(clients[i], buffer + sizeof(DataHeader), header->dataLen, 0); Logout *pLogout = (Logout*)buffer; printf("\tLogout : %s , cmd:%d, datalen:%d\n", pLogout->uname, pLogout->header.cmd, pLogout->header.dataLen); LogoutResult lr = {}; lr.result = 1; lr.header.cmd = CMD_LOGOUT_RESULT; lr.header.dataLen = sizeof(lr)-sizeof(DataHeader); send(clients[i], (char*)&lr, sizeof(lr), 0); break; } default: { //不会执行到这里, 可以无视了 header->cmd = CMD_ERROR; header->dataLen = 0; send(clients[i], (char*)&header, sizeof(header), 0); } } //看看有没有必要继续, nready 是 select返回值 if (--(*nready) == 0) break; } } }
客户端:
// client.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include "../trans.h" #include "../utils.h" #include <process.h> #define BUFFSIZE 8192 BOOL bContinue = TRUE; //起一个线程,用于接受用户输入 unsigned int __stdcall input_thread(void *); int _tmain(int argc, _TCHAR* argv[]) { WSADATA wsadata; //基本套路就不说了 WSAStartup(MAKEWORD(2, 2), &wsadata); SOCKET hSocket = socket(AF_INET, SOCK_STREAM, 0); SOCKADDR_IN peeraddr; memset(&peeraddr, 0, sizeof(peeraddr)); peeraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); peeraddr.sin_family = AF_INET; peeraddr.sin_port = htons(PORT); if (connect(hSocket, (SOCKADDR*)&peeraddr, sizeof(peeraddr)) == SOCKET_ERROR){ printf("connect error:%ld\n", WSAGetLastError()); return 0; } char msg[BUFFSIZE]; //这些与服务端类似, 具体可看服务端说明 FD_SET allreadset, readset; FD_ZERO(&allreadset); FD_SET(hSocket, &allreadset); int len = 0; int nready = 0; static char buff[BUFFSIZE] = {}; struct timeval tval = { 3, 0 }; //起线程,接受用户输入. unix下可 FD_SET(0, &allreadset), win32下不行 HANDLE hThread = (HANDLE)_beginthreadex(NULL, 0, input_thread,(void*)&hSocket, 0, 0); DWORD ret = 0; //下面用select 来接受服务端传送的消息,其实没必要 while (bContinue){ //用于检测输入线程是否终止, 输入"q" 就退出了 ret = WaitForSingleObject(hThread, 0); if (ret == WAIT_OBJECT_0){ puts("byebye"); break; } readset = allreadset; puts("before select"); //与服务端一样 nready = select(0, &readset, 0, 0, &tval); printf("after select , nready:%d\n", nready); //出错 if (nready == SOCKET_ERROR){ printf("error:%ld\n", WSAGetLastError()); continue; } //超时 else if (nready == 0){ printf("u can process something else\n"); continue; } // 下面代码与服务端接受消息的代码类似 if (FD_ISSET(hSocket, &readset)){ //这里不用while(1)了,偷懒,需要的自己去改下,serv.cpp中有 len = recv(hSocket, buff, sizeof(DataHeader), 0); if (len <= 0){ puts("closed by serv!"); break; } DataHeader * pHeader = (DataHeader*)buff; printf("\t from serv cmd : %d, datalen:%d\n", pHeader->cmd, pHeader->dataLen); switch (pHeader->cmd) { case CMD_LOGIN_RESULT: //接受服务端登录的结果,具体服务端有说明 { len = recv(hSocket, buff + sizeof(DataHeader), pHeader->dataLen, 0); if (len <= 0){ puts("closed by serv"); bContinue = FALSE; break; } LoginResult * pResult = (LoginResult *)buff; printf("\t from serv Login , datalen:%d ,result:%d \n ", pResult->header.dataLen, pResult->result); break; } case CMD_LOGOUT_RESULT: //接受服务端登出的结果 { len = recv(hSocket, buff + sizeof(DataHeader), pHeader->dataLen, 0); if (len <= 0){ puts("closed by serv"); bContinue = FALSE; break; } LogoutResult * pResult = (LogoutResult *)buff; printf("\t from serv Logout , datalen:%d ,result:%d \n ", pResult->header.dataLen, pResult->result); break; } case CMD_USER_JOIN: //这个命令是服务端群发的,用于显示新用户登录 { len = recv(hSocket, buff + sizeof(DataHeader), pHeader->dataLen, 0); if (len <= 0){ puts("closed by serv"); bContinue = FALSE; break; } UserJoin * pJoin = (UserJoin*)buff; printf("\t from serv UserJoin , datalen:%d ,result:%d \n ", pJoin->header.dataLen, pJoin->sock); break; } default: { puts("->>>> unknow msg"); } } } else{ puts(" wrong!!"); continue; } } closesocket(hSocket); WSACleanup(); return 0; } //接受用户输入 unsigned int __stdcall input_thread(void * pArgs) { puts("thread begin"); SOCKET hSocket = *(SOCKET*)pArgs; char buf[100] = {}; while (1){ scanf(" %s", buf); //一旦按了q , 客户端就退出了 if (!strcmp(buf, "q")){ return 0; } //输入login , 客户端自动发送一个 Login结构到服务端 else if (!strcmp(buf, "login")){ Login lg = {}; strcpy(lg.uname, "fuc"); strcpy(lg.passwd, "123"); lg.header.cmd = CMD_LOGIN; //命令 lg.header.dataLen = sizeof(Login)-sizeof(DataHeader); //消息体长度 send(hSocket, (const char*)&lg, sizeof(Login), 0); } else if (!strcmp(buf, "logout")){ Logout lg = {}; lg.header.cmd = CMD_LOGOUT; lg.header.dataLen = sizeof(Logout)-sizeof(DataHeader); strcpy(lg.uname, "fffff"); send(hSocket, (const char*)&lg, sizeof(Logout), 0); } else{ puts("unknow cmd"); } } }
