discard:丢弃所有收到的数据,简单的长连接TCP应用层协议
void DiscardServer::onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) { string msg(buf->retrieveAllAsString()); LOG_INFO << conn->name() << " discards " << msg.size() << " bytes received at " << time.toString(); }daytime:短连接协议,在发送完当前时间后,由服务器主动断开连接
void DaytimeServer::onConnection(const TcpConnectionPtr& conn) { LOG_INFO << "DaytimeServer - " << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); if (conn->connected()) { conn->send(Timestamp::now().toFormattedString() + "\n"); conn->shutdown();//主动断开连接 } }time : 与daytime极其相似,只不过它返回的不是日期时间字符串,而是一个32bit的整数
void TimeServer::onConnection(const muduo::net::TcpConnectionPtr& conn) { LOG_INFO << "TimeServer - " << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); if (conn->connected()) { time_t now = ::time(NULL); int32_t be32 = sockets::hostToNetwork32(static_cast<int32_t>(now)); conn->send(&be32, sizeof be32); conn->shutdown(); } }time客户端:time服务端发送的是二进制数据,不易读取,因此客户端来解析
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime) { if (buf->readableBytes() >= sizeof(int32_t)) { const void* data = buf->peek(); int32_t be32 = *static_cast<const int32_t*>(data); buf->retrieve(sizeof(int32_t)); time_t time = sockets::networkToHost32(be32); Timestamp ts(time * Timestamp::kMicroSecondsPerSecond); LOG_INFO << "Server time = " << time << ", " << ts.toFormattedString(); } else { LOG_INFO << conn->name() << " no enough data " << buf->readableBytes() << " at " << receiveTime.toFormattedString(); } } };echo:前面的都是一个单向接收和发送数据,这是第一个双向发送的协议,即将服务端发送的数据原封不动的发送回去
void EchoServer::onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp time) { muduo::string msg(buf->retrieveAllAsString()); LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, " << "data received at " << time.toString(); conn->send(msg); }Chargen: 只发送数据,不接受数据,且发送数据的速度不能快过客户端接收的速度
void ChargenServer::onConnection(const TcpConnectionPtr& conn) { LOG_INFO << "ChargenServer - " << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); if (conn->connected()) { conn->setTcpNoDelay(true); conn->send(message_); } } void ChargenServer::onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) { string msg(buf->retrieveAllAsString()); LOG_INFO << conn->name() << " discards " << msg.size() << " bytes received at " << time.toString(); }五合一
前面的五个程序都用到了Evenloop,其实是一个Reactor,用于注册和分发IO事件
int main() { LOG_INFO << "pid = " << getpid(); EventLoop loop; // one loop shared by multiple servers ChargenServer chargenServer(&loop, InetAddress(2019)); chargenServer.start(); DaytimeServer daytimeServer(&loop, InetAddress(2013)); daytimeServer.start(); DiscardServer discardServer(&loop, InetAddress(2009)); discardServer.start(); EchoServer echoServer(&loop, InetAddress(2007)); echoServer.start(); TimeServer timeServer(&loop, InetAddress(2037)); timeServer.start(); loop.loop(); }这就是Reactor模式复用线程的能力,让一个单线程程序同时具备多个网络服务功能
利用 onWriteComplete() 实现分段传输,做到不必一次全部读入内存。
为什么非阻塞网络编程中应用层buffer是必须的
non-blocking IO 的核心思想是避免阻塞在 read() 或 write() 或其他 IO 系统调用上,这样可以最大限度地复用 thread-of-control ,让一个线程能够服务于多个 socket 连接。这就是需要应用层 buffer 的原因。**TcpConnection必须要有output buffer ** :比如TCP发送了100kb的数据,但是咋write()调用,操作系统只接受了80kb,因为不想原地等待(非阻塞),所以要尽快交出控制权,返回事件循环中。对于应用程序而言,它只管生成数据,它不应该关心到底数据是一次发送还是分成几次发送TCP粘包问题
网络库在处理“socket可读”事件的时候必须一次性把socket的数据一次性读完(从操作系统的buff搬运到应用层的buff上面),否则会反复触发POLLIN事件,造成busy-loop.这是因为采用的是LT模式
Buffer的设计
对外表现是一块连续的内存,以方便客户代码的编写其size()可以自动增长,适应不同的消息内部以vector来保存数据Buffer的数据结构就是三个指针,一个数组,具体看书
Buffer其他设计方案
自己管理内存不用vector
zerocopy,注意不是严格意义上的0拷贝,数据从内核到用户空间有一次拷贝,如libevent2.0.x设计方案
内存不是连续的,是分快的
此部分具体看书
这里的并发连接数是指同时支持的客户端的连接数
不希望程序超载因为fd是稀缺资源 此后的内容具体看书