想了个不是问题的问题

    xiaoxiao2024-03-10  130

    昨天被一个问题困扰到,最后发现这个问题就不是个问题。

    怎么说呢?就是写服务器和客户端程序,两端想互相发送文件,对于小文件来说的话,是没啥,感觉不到发送过程的缓慢,但是换成一个几百兆的文件,就有点累了,得等上个几分钟才能发完,我就想为什么不能实现秒传。所以就在网上学习了多线程发送技术。实现了近百兆文件秒发的目标。具体功能就是服务器传文件到客户端客户端再反传给服务器,运行截图如下: 客户端接收到的文件:

    服务器端发送的文件(framework.7z)和接收到客户端反传回的文件(_ramwork.7z): 200MB的文件不到两秒传完了。结果显示内容也没啥差别。

    下面是基本思想,其实在上一篇文章都说了在这里用流程图来表示吧! 这方法快是快,但致命的缺点就是不能用到IO多路复用的服务器中。只是用一个客户端和一个服务器之间的情况。要是多个客户端的话,服务器就懵了,不知道那个连接发来的内容该写到同一文件了,另外发一个文件创建太多连接对性能影响也比较大,所以这种传文件的方法用在服务器和客户端之间发大文件不现实!

    不过话说回来,现在我们在网上下载个大文件的话也得等上一段时间才行。限制就在于一个TCP连接数据缓冲区本来就有限制,最多也就一次发65535bytes的长度。但跨越这个限制建立多连接用起来却不灵活。所以还是老老实实用单连接发吧,服务器那边也容易处理。 下面是我的代码实现:

    writen和readn #pragma once #include<unistd.h> #include<stdlib.h> #include<fcntl.h> #include<stdio.h> #include<errno.h> int readn(int fd, void *buf, int n) ; //写数据 int writen(int fd, void *buf, int n) ; int readn(int fd, void *buf, int n) { int nleft = n; //还需要读取的字节数 char *bufptr =(char*)buf; //指向read函数当前存放数据的位置 int nread = 0 ; while(nleft > 0){ if((nread = read(fd, bufptr, nleft)) < 0){ if(errno == EINTR){ //遇到中断 continue; } else // 其他错误 return -1; } else if(nread == 0){ // 遇到EOF break; } nleft -= nread; bufptr += nread; } return (n - nleft); } int writen(int fd, void *buf, int n){ int nleft = n; char *bufptr = (char*)buf; int nwrite; while(nleft > 0){ if((nwrite = write(fd, bufptr, nleft)) < 0){ if(errno == EINTR) continue; else return -1; } if(nwrite == 0) continue; nleft -= nwrite; bufptr += nwrite; } return (n-nleft); // 注意这里必须是 n 因为这里保证了 n 字节都被写入 } 两端协议 struct proctol{ int flag = 12; int fd ; int sockfd ; int threadNum ; long cur ; long moved ; //文件长度 long size ; //缓冲区大小 char fileName[1024] ; char buf[4096] ; } ; 服务器 #include"testServer.h" #include"ReadWrite.h" int main(int argc, char** argv) { if(argc< 0){ cout<< "use:./a.out filename" <<endl ; exit(1); } int fd = socket(AF_INET, SOCK_STREAM, 0) ; if(fd < 0) { return 0 ; } struct sockaddr_in sock_addr ; sock_addr.sin_family = AF_INET ; sock_addr.sin_port = htons(PORT) ; sock_addr.sin_addr.s_addr = INADDR_ANY ; int use = 1 ; if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &use, sizeof(use))< 0){ cout << "socketoopt" <<endl ; return 1 ; } if(bind(fd, (struct sockaddr*)&sock_addr, sizeof(sock_addr))< 0) { cout << "bind" <<endl ; return 1 ; } if(listen(fd, BACKLOG) < 0){ cout<< "listen" <<endl ; return 1 ; } SendFile(argv[1], fd) ; RecvData(fd) ; return 0; } void RecvData(int sockfd) { int connfd ; //接受连接 if((connfd = accept(sockfd, NULL, NULL)) < 0) { return ; } proctol data ; bzero(&data, sizeof(data)) ; //获取客户端向服务发送的第一个包 int ret = readn(connfd, &data, sizeof(data)) ; if(ret < 0) { printf("readn失败!\n") ; exit(1) ; } int threadNum = data.threadNum ; //第一个包里面含有文件名称 close(connfd) ; int fd = open(data.fileName, O_CREAT|O_WRONLY, 0666) ; printf("线程数量%d\n",threadNum) ; for(int i = 0 ; i< threadNum; i++) { proctol data ; bzero(&data, sizeof(data)) ; printf("第%d个连接\n", i) ; if((connfd = accept(sockfd, NULL, NULL)) < 0) { printf("接收文件accept出错!\n") ; return ; } data.fd = fd ; data.sockfd = connfd ; std::thread t(threadRecv, std::ref(data)) ; t.join() ; } close(fd) ; } //接收每个客户端线程发送来的数据 void threadRecv(proctol& data) { int connfd = data.sockfd ; int fileFd = data.fd ; proctol tmp ; bzero(&tmp, sizeof(tmp)) ; int ret = readn(connfd, &tmp, sizeof(tmp)) ; if(ret < 0) { printf("接收功能readn读取数据错误!\n") ; return ; } long size = tmp.size ; long cur = tmp.cur ; ret = pwrite(fileFd, tmp.buf, tmp.moved, cur) ; if(ret < 0) { printf("pwrite写文件失败!\n") ; return ; } size -= tmp.moved ; cur += tmp.moved ; while(size) { bzero(&tmp, sizeof(tmp)) ; ret = readn(connfd, &tmp, sizeof(tmp)) ; if(ret < 0&& errno ==EINTR) { printf("读取文件失败!\n"); continue ; } else if(ret == 0) { break ; } if(ret < 0) { printf("readn接收文件失败!\n") ; return ; } ret = pwrite(fileFd, tmp.buf, tmp.moved, cur) ; if(ret < 0) { printf("pwrite写文件失败!\n") ; return ; } size -= tmp.moved ; cur += tmp.moved ; } close(connfd) ; } int SendFile(const char*fileName, int sockFd) { struct stat st_file ; if(lstat(fileName, &st_file)< 0){ cout << "发送的文件不存在!"<<endl ; return 1; } //为线程分配任务 long file_block ; proctol pp ; pp.size = st_file.st_size ; //获取文件的大小,20mb以上为大文件 if(pp.size>=1024*1024*50) { file_block = 1024*1024*10 ; } else { file_block = 1024*1024*3 ; } printf("每个线程分的块大小:%ld\n", file_block) ; int threadNum = pp.size/file_block ; //获取最后一个包的长度 int lastpacksize = (int)pp.size-threadNum*file_block ; //如果最后一个包的长度不问0的话,则多创建一个线程 if(lastpacksize) { threadNum++ ; } if(threadNum > 100) { printf("线程数量太多!\n") ; return 0; } int connfd = accept(sockFd, NULL, NULL) ; if(connfd < 0) { printf("接收连接accept出错!") ; exit(1) ; } proctol data ; data.size = file_block ; //第一个包是文件名称和线程的数量 strcpy(data.fileName, fileName) ; data.sockfd = sockFd ; data.threadNum = threadNum ; int ret = writen(connfd, &data, sizeof(data)) ; if(ret < 0) { printf("发送消息失败!") ; exit(1) ; } close(connfd) ; //打开文件 printf("文件名:%s\n", fileName) ; int fd = open(fileName, O_RDONLY) ; if(fd < 0) { cout << "open error" <<endl ; exit(1) ; } for(int i= 0; i< threadNum; i++) { connfd = accept(sockFd, NULL, NULL) ; if(connfd < 0) { exit(1) ; } //发送文件 proctol data ; memset(data.buf, 0, sizeof(data.buf)) ; strcpy(data.fileName, fileName) ; //当线程数量不为0且最后一个包的长度不为0 if(i == threadNum-1&&lastpacksize) { data.cur = i*file_block ; data.size = lastpacksize ; } else { //计算偏移 data.cur = i*file_block ; data.size = file_block ; } data.fd = fd ; data.sockfd = connfd ; thread t(sender, std::ref(data)) ; t.join() ; } close(fd) ; return 1 ; } //发送文 void sender(proctol& data) { long cur = data.cur ; int connfd = data.sockfd ; int read_count ; long size = data.size ; long read_size = 0 ; int filefd = data.fd ; printf("传送文件的长度%ld,名称:%s\n", data.size, data.fileName) ; while(size){ //原子操作 data.moved = sizeof(data.buf) ; if(sizeof(data.buf) > (size_t)size) { data.moved = size ; } read_count = pread(filefd, data.buf, data.moved, cur+read_size) ; if(read_count < 0&& errno == EINTR) { printf("被信号打断") ; continue ; } else if(read_count == 0) { break ; } else if(read_count < 0) { printf("pread 错误!") ; exit(1) ; } writen(connfd, &data, sizeof(data)) ; size-=read_count ; read_size += read_count ; } close(connfd) ; } 客户端 #include"testClient.h" #include"ReadWrite.h" struct sockaddr_in addr ; int main(int argc, char** argv) { if(argc != 3) { printf("usage:./a.out ip port\n") ; exit(1) ; } int sockfd = socket(AF_INET, SOCK_STREAM, 0) ; if(sockfd < 0) { printf("创建套接字失败!"); exit(1) ; } const char* ip = argv[1] ; int port = atoi(argv[2]) ; addr.sin_family = AF_INET ; addr.sin_port = htons(port) ; inet_pton(AF_INET, ip, &addr.sin_addr) ; if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { printf("connect连接失败!") ; exit(1) ; } int threadNum = 0 ; proctol data ; int ret = readn(sockfd, &data, sizeof(data)) ; if(ret < 0) { printf("recv 接收消息失败!") ; exit(1) ; } threadNum = data.threadNum ; if(threadNum > 100) { printf("线程数量太多!\n") ; exit(1) ; } int fd = open(data.fileName, O_CREAT|O_WRONLY, 0777) ; if(fd < 0) { printf("open创建文件出错!") ; exit(1) ; } //创建线程 for(int i = 0; i< threadNum; i++) { proctol data ; memset(&data, 0, sizeof(data)) ; data.fd = fd ; std::thread t(receive ,std::ref(data)) ; t.join() ; } close(fd) ; printf("回车开始服务器回传数据....\n"); getchar() ; //客户端给服务器发送大文件 //发给服务器后进行标识为客户端回射的 const char* filename= data.fileName ; sendToServer(filename) ; close(sockfd) ; printf("接收完成!") ; return 0; } //发送文件给服务器 void sendToServer(const char*fileName) { //创建套接字 int sockfd = socket(AF_INET, SOCK_STREAM, 0) ; if(sockfd < 0) { printf("创建套接字失败!") ; exit(1) ; } if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { printf("连接服务器失败!") ; exit(1) ; } struct stat st ; lstat(fileName, &st) ; int threadNum = 0 ; long block_size = 0 ; int lastpack = 0 ; long size = st.st_size ; //最大文件为20M if(size > 1024*1024*50) { block_size = 1024*1024*10 ; } //不超过20M的每个线程传送2M else { block_size = 1024*1024*3 ; } threadNum = size/block_size ; lastpack = size -threadNum*block_size ; //最后一个包长度不为0,多分一个线程处理 if(lastpack) { threadNum++ ; } if(threadNum > 100) { printf("不适合发所指定的文件!文件过大!\n"); exit(1) ; } proctol data ; data.threadNum = threadNum ; strcpy(data.fileName, fileName) ; data.fileName[0] = '_' ; int ret = writen(sockfd, &data, sizeof(data)) ; if(ret < 0) { printf("文件信息发送失败!\n") ; return ; } close(sockfd) ; int fd = open(fileName, O_RDONLY) ; for(int i= 0; i< threadNum; i++) { sockfd = socket(AF_INET, SOCK_STREAM, 0) ; //和服务器建立连接 if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { printf("线程创建连接失败!\n") ; return ; } if(i == threadNum-1&&lastpack){ data.cur = i*block_size ; data.size = lastpack ; } //判断是否到达文件长度 else { data.cur = i*block_size ; data.size = block_size ; } data.fd = fd ; data.sockfd = sockfd ; std::thread t1(threadSend, std::ref(data)) ; t1.join() ; } printf("发送完成,size=%ld", size) ; } //开线程向服务器端发送数据 void threadSend(proctol& data) { //线程负责的范围和负责的长度 long cur = data.cur ; long size = data.size ; int sockfd = data.sockfd ; int filefd = data.fd ; proctol tmp ; bzero(&tmp, sizeof(tmp)) ; tmp.size = size ; tmp.cur = cur ; int ret = 0; int read_count = 0 ; while(size) { int len = sizeof(data.buf) ; if(size<len) { len = size ; } ///通知对方这次在接受到数据的基础上, //在文件位置为cur的基础上移动moved长度 tmp.moved = len ; ret = pread(filefd, tmp.buf, len, cur+read_count) ; if(ret < 0) { printf("发送数据出错!") ; return ; } writen(sockfd, &tmp, sizeof(tmp)) ; size -= ret ; read_count += ret ; } close(sockfd) ; } void receive(proctol& data) { int sockfd = socket(AF_INET, SOCK_STREAM, 0) ; if(sockfd < 0) { printf("创建套接字失败!") ; exit(1) ; } if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { printf("连接服务器失败!") ; exit(1) ; } //接收到数据 proctol datas ; int ret ; if((ret = readn(sockfd, &datas, sizeof(datas))) < 0) { printf("接收数据错误!"); exit(1) ; } long size = datas.size ; long cur = datas.cur ; if(pwrite(data.fd, datas.buf, datas.moved, cur)<0) { printf("写文件出错!") ; exit(1) ; } size -= datas.moved; cur += datas.moved ; //接收消息 while(size) { ret = readn(sockfd, &datas, sizeof(datas)) ; if(ret < 0 && errno == EINTR) { printf("读取文件失败!\n") ; continue ; } else if (ret == 0) { break ; } else if(ret < 0) { printf("读取文件出错!") ; exit(1) ; } //将数据写入到文件中 if(pwrite(data.fd, datas.buf, datas.moved, cur) < 0) { printf("写文件出错!") ; exit(1) ; } cur += datas.moved ; size -= datas.moved ; } }
    最新回复(0)