ceph是在一个不断进化的软件,会增加功能也会取消旧的功能,如何在ceph monitor的不同版本间保持兼容或者防止不兼容而产生错误,是需要认真思考的问题。而我们阅读代码时往往忽略了兼容性管理的代码,而重点关心它的业务代码,所以这里特别写一篇文章谈谈兼容性的管理。
ceph monitor兼容主要包括两方面:
通讯时检查对方的兼容性代码访问本机数据时的兼容性ceph使用原生的Messenger的方式来通讯,在通讯开始前需要创建一个Messenger, 例如ceph monitor创建monitor之间通讯的messenger:
Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type, entity_name_t::MON(rank), "mon", 0);ceph在建立连接时会告诉对方自己是什么类型的节点,主要有几种类型: monitor, osd, mds, client,上面的代码就说明自己是一个Monitor,而任何一个Messenger都会准对某一种节点设置一个Policy, Policy的内容与兼容有关的是:
/// Specify features supported locally by the endpoint. uint64_t features_supported; /// Specify features any remotes must have to talk to this endpoint. int64_t features_required;其中features_support表示本节点支持的功能,features_required表示对方必须具备的功能,每个功能一个bit位。缺省的Policy把featuires_supported设置成当前代码支持的全部功能。即:CEPH_FEATURES_ALL。
而ceph为monitor之间设置的Policy:
msgr->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossless_peer_reuse( supported, CEPH_FEATURE_UID | CEPH_FEATURE_MON_SINGLE_PAXOS));初始设置的features_required仅仅包含了CEPH_FEATUIRE_UID和CEPH_FEATURE_MON_SINGLE_PAXOS,而features_supported则是全部功能, monitor之间的兼容性是在随后的通讯过程中逐渐被检测的。
在连接建立时就检查对方的功能位设置,在ceph messenger通讯协议中,双方提供给对方支持的功能集,并且与本地Policy中设置的必需的功能位进行比较,例如:
ceph的simple messenger中,connect发起方会提供supported features:
while (1) { delete authorizer; authorizer = msgr->get_authorizer(peer_type, false); bufferlist authorizer_reply; ceph_msg_connect connect; connect.features = policy.features_supported;当接收到应答后,又会验证对方支持的功能:
if (reply.tag == CEPH_MSGR_TAG_READY || reply.tag == CEPH_MSGR_TAG_SEQ) { uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features; if (feat_missing) { ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl; goto fail_locked; }查看reply的feature与本地必需的功能,如果缺少就会失败。
ceph使用一个CompatSet的数据结构来表示功能集合:
struct CompatSet { struct Feature { uint64_t id; string name; Feature(uint64_t _id, const char *_name) : id(_id), name(_name) {} Feature(uint64_t _id, const string& _name) : id(_id), name(_name) {} }; struct FeatureSet { uint64_t mask; map <uint64_t,string> names; }; FeatureSet compat; FeatureSet ro_compat; FeatureSet incompat; };mask中的每一位代表代表一个功能, 兼容测试主要判断是否可读可写。 测试是否可读是通过readable成员函数来实现:
bool readable(CompatSet const& other) const { return !((other.incompat.mask ^ incompat.mask) & other.incompat.mask); }这个函数的意思是如果我的incompat不能全部包含对方的位域,我就无法读取对方数据。
测试是否可写是用writable成员函数来实现:
bool writeable(CompatSet const& other) const { return readable(other) && !((other.ro_compat.mask ^ ro_compat.mask) & other.ro_compat.mask); }这个函数的意思就是:除了readable,我在ro_compat全部的位域包含了对方的位域才能写数据。
而get_supported_features()就是当前Monitor代码能支持的所有功能,read_features_off_disk()则把write_features()的数据读出来,我们看到它用writable()测试当前代码是否有能力可以写本地文件系统上的数据。
read_features_off_disk读出write_feature()生成的数据:
void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features) { bufferlist featuresbl; store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); if (featuresbl.length() == 0) { generic_dout(0) << "WARNING: mon fs missing feature list.\n" << "Assuming it is old-style and introducing one." << dendl; //we only want the baseline ~v.18 features assumed to be on disk. //If new features are introduced this code needs to disappear or //be made smarter. *features = get_legacy_features(); bufferlist bl; features->encode(bl); MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); t->put(MONITOR_NAME, COMPAT_SET_LOC, bl); store->apply_transaction(t); } else { bufferlist::iterator it = featuresbl.begin(); features->decode(it); } }作为一种特殊情况,如果数据是旧版的ceph monitor生成的,因为旧版没有写features到本地文件系统,所以read_features_off_disk会调用get_legacy_features()函数得到旧版本ceph monitor的功能集,这只是一个简单的构造:
CompatSet Monitor::get_legacy_features() { CompatSet::FeatureSet ceph_mon_feature_compat; CompatSet::FeatureSet ceph_mon_feature_ro_compat; CompatSet::FeatureSet ceph_mon_feature_incompat; ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, ceph_mon_feature_incompat); } Monitor运行时检查和设置features一旦ceph_mon.cc决定运行Monitor, 首先会调用成员函数preinit(), 而preinit的一项工作就时调用read_features()把 本地文件中记录的feature读入到成员变量features中:
void Monitor::read_features() { read_features_off_disk(store, &features); dout(10) << "features " << features << dendl; apply_compatset_features_to_quorum_requirements(); dout(10) << "required_features " << required_features << dendl; }当然它不会忘记按照本地数据中保存的功能位,要求monitor paxos集群的所有法人必须有对应的功能位:
void Monitor::apply_compatset_features_to_quorum_requirements() { required_features = 0; if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) { required_features |= CEPH_FEATURE_OSD_ERASURE_CODES; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) { required_features |= CEPH_FEATURE_OSDMAP_ENC; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) { required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2; } dout(10) << __func__ << " required_features " << required_features << dendl; }required_features的设置,可以防止不兼容的Monitor构成一个paxos集群,有几个地方通过required_features阻断这些不兼容的monitor之间的通讯:
收到一个探测包,发现对方不能提供相关的功能位,则阻断通讯:
void Monitor::handle_probe_probe(MMonProbe *m) { MMonProbe *r; dout(10) << "handle_probe_probe " << m->get_source_inst() << *m << " features " << m->get_connection()->get_features() << dendl; uint64_t missing = required_features & ~m->get_connection()->get_features(); if (missing) { dout(1) << " peer " << m->get_source_addr() << " missing features " << missing << dendl; if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) { MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES, name, has_ever_joined); m->required_features = required_features; m->get_connection()->send_message(r); } goto out; }获取数据复制的cookie时的,发现对方不能提供相关的功能位,阻断通讯:
void Monitor::handle_sync_get_cookie(MMonSync *m) { if (is_synchronizing()) { _sync_reply_no_cookie(m); return; } assert(g_conf->mon_sync_provider_kill_at != 1); // make sure they can understand us. if ((required_features ^ m->get_connection()->get_features()) & required_features) { <<======================= dout(5) << " ignoring peer mon." << m->get_source().num() << " has features " << std::hex << m->get_connection()->get_features() << " but we require " << required_features << std::dec << dendl; return; } paxos集群形成时的功能集兼容一个接收到要求投票的请求的选举器,检查功能位是否兼容:通过获得当前Monitor对兼容性的要求和对方能支持的功能集的比较来决定是否还要继续:
void Elector::handle_propose(MMonElection *m) { ... uint64_t required_features = mon->get_required_features(); dout(10) << __func__ << " required features " << required_features << ", peer features " << m->get_connection()->get_features() << dendl; if ((required_features ^ m->get_connection()->get_features()) & required_features) { dout(5) << " ignoring propose from mon" << from << " without required features" << dendl; nak_old_peer(m); return;一个选举器在接收到选举应答时,检查功能位是否兼容:通过获得当前Monitor对兼容性的要求和对方能支持的功能集的比较来决定是否还要继续:
void Elector::handle_ack(MMonElection *m) { ... uint64_t required_features = mon->get_required_features(); if ((required_features ^ m->get_connection()->get_features()) & required_features) { dout(5) << " ignoring ack from mon" << from << " without required features" << dendl; m->put(); return; }一个提出选举的Monitor,会在选举过程中收集与各个monitor连接时对方提供的features, 记录在案,在获胜后,会求出这些Monitor共同支持的功能集:
void Elector::victory() { leader_acked = -1; electing_me = false; uint64_t features = CEPH_FEATURES_ALL; set<int> quorum; for (map<int, uint64_t>::iterator p = acked_me.begin(); p != acked_me.end(); ++p) { quorum.insert(p->first); features &= p->second; } ... mon->win_election(epoch, quorum, features, cmds, cmdsize, ©_classic_mons);最后得到的features变量包含这些monitor共同支持的集合,同时把这个features传给Monitor类记录在案。
而Monitor::win_election把features保存在qurum_features后调用finish_election, finish_election调用apply_quorum_to_compatset_features(), apply_quorum_to_compatset_features就是把paxos集群中的monitor的共同的功能集合保存在本地文件中,以备下次ceph mon启动时通过read_features读回来:
void Monitor::apply_quorum_to_compatset_features() { CompatSet new_features(features); if (quorum_features & CEPH_FEATURE_OSD_ERASURE_CODES) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); } if (quorum_features & CEPH_FEATURE_OSDMAP_ENC) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); } if (quorum_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); } if (new_features.compare(features) != 0) { CompatSet diff = features.unsupported(new_features); dout(1) << __func__ << " enabling new quorum features: " << diff << dendl; features = new_features; MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); write_features(t); <<<<<<<<<<<<<<保存 store->apply_transaction(t); apply_compatset_features_to_quorum_requirements(); } }ceph monitor在通讯初始化时,声明需要最小的功能集,它绕开了Messenger中对required_features的过分依赖,而是在通讯建立后动态地检查是否兼容。
相关资源:敏捷开发V1.0.pptx