本节书摘来自华章出版社《Ceph源码分析》一书中的第3章,第3.1节Ceph网络通信框架,作者常涛,更多章节内容可以访问云栖社区“华章计算机”公众号查看
第3章Ceph网络通信本章介绍Ceph网络通信模块,这是客户端和服务器通信的底层模块,用来在客户端和服务器之间接收和发送请求。其实现功能比较清晰,是一个相对较独立的模块,理解起来比较容易,所以首先介绍它。
3.1 Ceph网络通信框架一个分布式存储系统需要一个稳定的底层网络通信模块,用于各节点之间的互联互通。对于一个网络通信系统,要求如下:高性能。性能评价的两个指标:带宽和延迟。稳定可靠。数据不丢包,在网络中断时,实现重连等异常处理。网络通信模块的实现在源代码src/msg的目录下,其首先定义了一个网络通信的框架,三个子目录里分别对应:Simple、Async、 XIO 三种不同的实现方式。Simple是比较简单,目前比较稳定的实现,系统默认的用于生产环境的方式。 它最大的特点是:每一个网络链接,都会创建两个的线程,一个专门用于接收,一个专门用于发送。这种模式实现比较简单,但是对于大规模的集群部署,大量的链接会产生大量的线程,会消耗CPU资源,影响性能。Async模式使用了基于事件的I/O多路复用模式。这是目前网络通信中广泛采用的方式,但是在Ceph中,官方宣称这种方式还处于试验阶段,不够稳定,还不能用于生产环境。XIO方式使用了开源的网络通信库accelio来实现。这种方式需要依赖第三方的库accelio稳定性,需要对accelio的使用方式以及代码实现都比较熟悉。目前也处于试验阶段。特别注意的是,前两种方式只支持TCP/IP协议,XIO可以支持Infiniband网络。在msg目录下定义了网络通信的抽象框架,它完成了通信接口和具体实现的分离。在其下分别有msg/simple子目录、msg/Async子目录、msg/xio子目录,分别对应三种不同的实现。
3.1.1 Message类Message 是所有消息的基类,任何要发送的消息,都要继承该类,格式如图3-1所示。`
` 图3-1 消息发送格式 消息的结构如下:header是消息头,类似一个消息的信封(envelope),user_data是用于要发送的实际数据,footer是一个消息的结束标记,如下所示: class Message : public RefCountedObject { ceph_msg_header header; // 消息头 ceph_msg_footer footer; // 消息尾 //用户数据 bufferlist payload; // "front" unaligned blob bufferlist middle; // "middle" unaligned blob bufferlist data; // data payload (page-alignment will be preserved where possible) //消息相关的时间戳 utime_t recv_stamp; //开始接收数据的时间戳 utime_t dispatch_stamp; // dispatch的时间戳 utime_t throttle_stamp; //获取throttle的slot的时间戳 utime_t recv_complete_stamp; //接收完成的时间戳 ConnectionRef connection; //网络连接类 uint32_t magic; //消息的魔术字 bi::list_member_hook<> dispatch_q; //boost::intrusive需要的字段 }下面分别介绍其中的重要参数。ceph_msg_header为消息头,它定义了消息传输相关的元数据:struct ceph_msg_header { __le64 seq; //当前session内消息的唯一序号 __le64 tid; //消息的全局唯一的id __le16 type; //消息类型 __le16 priority; //优先级 __le16 version; //消息编码的版本 __le32 front_len; // payload的长度 __le32 middle_len; // middle的长度 __le32 data_len; // data的长度
__le16 data_off; //对象的数据偏移量
struct ceph_entity_name src;//消息源
//一些旧的代码,用于兼容,如果为零就忽略 __le16 compat_version; __le16 reserved; __le32 crc; //消息头的crc32c校验信息} attribute ((packed));ceph_msg_footer为消息的尾部,附加了一些crc校验数据和消息结束标志:struct ceph_msg_footer { __le32 front_crc, middle_crc, data_crc;
//分别对应crc效验码__le64 sig; //消息的64位signature __u8 flags; //结束标志} attribute ((packed));消息带的数据分别保存在payload、middle、data这三个bufferlist中。payload一般保存Ceph操作相关的元数据,middle目前没有使用到,data一般为读写的数据。在源代码src/messages下定义了系统需要的相关消息,其都是Message类的子类。
3.1.2 Connection类Connection对应端(port)对端的socket链接的封装。其最重要的接口是可以发送消息:
struct Connection : public RefCountedObject { mutable Mutex lock; //锁保护Connection的所有字段 Messenger *msgr; RefCountedObject *priv; //链接的私有数据 int peer_type; //链接的peer类型 entity_addr_t peer_addr; //peer的地址 utime_t last_keepalive, last_keepalive_ack; //最后一次发送keeplive的时间和最后一次接收keepalive的ACK的时间 private: uint64_t features; //一些feature的标志位 public: bool failed; //当值为true时,该链接为lossy链接已经失效了 int rx_buffers_version; //接收缓存区的版本 map<ceph_tid_t,pair<bufferlist,int> > rx_buffers; //接收缓冲区 消息的标识ceph_tid --> (buffer, rx_buffers_version)的映射 } 其最重要的功能就是发送消息的接口: virtual int send_message(Message *m) = 0;3.1.3 Dispatcher 类Dispatcher是消息分发的接口,其分发消息的接口为:`virtual bool ms_dispatch(Message *m) = 0;virtual void ms_fast_dispatch(Message *m);`Server端注册该Dispatcher类用于把接收到的Message请求分发给具体处理的应用层。Client端需要实现一个Dispatcher函数,用于处理收到的ACK应对消息。
3.1.4 MessengerMessenger是整个网络抽象模块,定义了网络模块的基本API接口。网络模块对外提供的基本功能,就是能在节点之间发送和接受消息。向一个节点发送消息的命令如下:virtual int send_message(Message *m, const entity_inst_t& dest) = 0;注册一个Dispatcher用来分发消息的命令如下:void add_dispatcher_head(Dispatcher *d)
3.1.5 网络连接的策略Policy定义了Messenger处理Connection的一些策略:
struct Policy { bool lossy; //如果为true,该当该连接出现错误时就删除 bool server; //如果为true,为服务端,都是被动连接 bool standby; //如果为true,该连接处于等待状态 bool resetcheck; //如果为true,该连接出错后重连 //该connection相关的流控操作 Throttle *throttler_bytes; Throttle *throttler_messages; //本地端的一些feature标志 uint64_t features_supported; //远程端需要的一些feature标志 uint64_t features_required; }3.1.6 网络模块的使用通过下面最基本的服务器和客户端的实例程序,了解如何调用网络通信模块提供的接口来完成收发请求消息的功能。
Server程序分析Server程序源代码在test/simple_server.cc里,这里只展示有关网络部分的核心流程。1)调用Messenger的函数create 创建一个Messenger的实例,配置选项g_conf->ms_type为配置的实现类型,目前有三种方式:
simple、async、xio: messenger = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MON(-1), "simple_server", 0 /* nonce */); 2)设置Messenger的属性: messenger->set_magic(MSG_MAGIC_TRACE_CTR); messenger->set_default_policy( Messenger::Policy::stateless_server(CEPH_FEATURES_ALL, 0)); 3)对于Server,需要bind服务端地址: r = messenger->bind(bind_addr); if (r < 0) goto out; common_init_finish(g_ceph_context); 4)创建一个Dispatcher,并添加到Messenger: dispatcher = new SimpleDispatcher(messenger); messenger->add_dispatcher_head(dispatcher); 5)启动Messenger: messenger->start(); messenger->wait(); //本函数必须等start完成才能调用 SimpleDispatcher函数里实现了ms_dispatch,用于把接收到的各种请求消息分发给相关的处理函数。 Client程序分析源代码在test/simple_client.cc里,这里只展示有关网络部分的核心流程。1)调用Messenger的函数create创建一个Messenger的实例:
messenger = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MON(-1), "client", getpid());2)设置相关的策略:`messenger->set_magic(MSG_MAGIC_TRACE_CTR);messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));`3)创建Dispatcher类并添加,用于接收消息:`dispatcher = new SimpleDispatcher(messenger);messenger->add_dispatcher_head(dispatcher);dispatcher->set_active(); `4)启动消息:`r = messenger->start();if (r < 0)
goto out;`5)下面开始发送请求,先获取目标Server的链接:conn = messenger->get_connection(dest_server);6)通过Connection来发送请求消息。这里的消息发送方式都是异步发送,接收到请求消息的ACK应答消息后,将在Dispatcher的 ms_dispatch或者ms_fast_dispatch处理函数里做相关的处理:`Message *m;for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) { //如果需要,这里要添加实际的数据 if (! n_dsize) {
m = new MPing();} else {
m = new_simple_ping_with_data("simple_client", n_dsize);} conn->send_message(m);}`综上所述,通过Ceph的网络框架发送消息比较简单。在Server端,只需要创建一个Messenger实例,设置相应的策略并绑定服务端口,然后就设置一个Dispatcher来处理接收到的请求。在Client端,只需要创建一个Messenger实例,设置相关的策略和Dispatcher用于处理返回的应答消息。通过获取对应Server的connection来发送消息即可。
相关资源:敏捷开发V1.0.pptx