本节书摘来自华章出版社《Ceph源码分析》一书中的第2章,第2.3节线程池,作者常涛,更多章节内容可以访问云栖社区“华章计算机”公众号查看
2.3 线程池线程池(ThreadPool)在分布式存储系统的实现中是必不可少的,在Ceph的代码中广泛用到。Ceph中线程池的实现也比较复杂,结构如下:
class ThreadPool : public md_config_obs_t { CephContext *cct; string name; //线程池的名字 string lockname; //锁的名字 Mutex _lock; //线程互斥的锁,也是工作队列访问互斥的锁 Cond _cond; //锁对应的条件变量 bool _stop; //线程池是否停止的标志 int _pause; //暂时中止线程池的标志 int _draining; Cond _wait_cond; int ioprio_class, ioprio_priority; vector<WorkQueue_*> work_queues; //工作队列 int last_work_queue; //最后访问的工作队列 set<WorkThread*> _threads; //线程池中的工作线程 list<WorkThread*> _old_threads; //等待进joined操作的线程 int processing; }类ThreadPool里包函一些比较重要的数据成员:工作线程集合_threads。等待Join操作的旧线程集合_old_threads。工作队列集合,保存所有要处理的任务。一般情况下,一个工作队列对应一个类型的处理任务,一个线程池对应一个工作队列,专门用于处理该类型的任务。如果是后台任务,又不紧急,就可以将多个工作队列放置到一个线程池里,该线程池可以处理不同类型的任务。线程池的实现主要包括:线程池的启动过程,线程池对应的工作队列的管理,线程池对应的执行函数如何执行任务。下面分别介绍这些实现,然后介绍一些Ceph线程池实现的超时检查功能,最后介绍ShardedThreadpool的实现原理。
2.3.1 线程池的启动函数ThreadPool::start() 用来启动线程池,其在加锁的情况下,调用函数start_threads,该函数检查当前线程数,如果小于配置的线程池,就创建新的工作线程。
2.3.2 工作队列工作队列(WorkQueue)定义了线程池要处理的任务。任务类型在模板参数中指定。在构造函数里,就把自己加入到线程池的工作队列集合中:
template<class T> class WorkQueue : public WorkQueue_ { ThreadPool *pool; WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_ (n, ti, sti), pool(p) { pool->add_work_queue(this); } …… }WorkQueue实现了一部分功能:进队列和出队列,以及加锁,并用通过条件变量通知相应的处理线程:
bool queue(T *item) { pool->_lock.Lock(); bool r = _enqueue(item); pool->_cond.SignalOne(); pool->_lock.Unlock(); return r; } void dequeue(T *item) { pool->_lock.Lock(); _dequeue(item); pool->_lock.Unlock(); } void clear() { pool->_lock.Lock(); _clear(); pool->_lock.Unlock(); }还有一部分功能,需要使用者自己定义。需要自己定义实现保存任务的容器,添加和删除的方法,以及如何处理任务的方法:
virtual bool _enqueue(T *) = 0; //从提交的任务中去除一个项 virtual void _dequeue(T *) = 0; //去除一个项并返回原始指针 virtual T *_dequeue() = 0; virtual void _process(T *t) { assert(0); } virtual void _process(T *t, TPHandle &) { _process(t);2.3.3 线程池的执行函数函数worker为线程池的执行函数:void ThreadPool::worker(WorkThread *wt)其处理过程如下:1)首先检查_stop标志,确保线程池没有关闭。2)调用函数join_old_threads把旧的工作线程释放掉。检查如果线程数量大于配置的数量_num_threads,就把当前线程从线程集合中删除,并加入_old_threads队列中,并退出循环。3)如果线程池没有暂时中止,并且work_queues不为空,就从last_work_queue开始,遍历每一个工作队列,如果工作队列不为空,就取出一个item,调用工作队列的处理函数做处理。
2.3.4 超时检查TPHandle是一个有意思的事情。每次线程函数执行时,都会设置一个grace超时时间,当线程执行超过该时间,就认为是unhealthy的状态。当执行时间超过suicide_grace时,OSD就会产生断言而导致自杀,代码如下:
struct heartbeat_handle_d { const std::string name; atomic_t timeout, suicide_timeout; time_t grace, suicide_grace; std::list<heartbeat_handle_d*>::iterator list_item; } class TPHandle { friend class ThreadPool; CephContext *cct; heartbeat_handle_d *hb; //心跳 time_t grace; //超时 time_t suicide_grace; //自杀的超时时间 }结构heartbeat_handle_d记录了相关信息,并把该结构添加到HeartbeatMap的系统链表中保存。OSD会有一个定时器,定时检查是否超时。
2.3.5 ShardedThreadPool这里简单介绍一个ShardedThreadPool。在之前的介绍中,ThreadPool实现的线程池,其每个线程都有机会处理工作队列的任意一个任务。这就会导致一个问题,如果任务之间有互斥性,那么正在处理该任务的两个线程有一个必须等待另一个处理完成后才能处理,从而导致线程的阻塞,性能下降。例2-1 如表2-1所示,线程Thread1和Thread2分别正在处理Job1和Job2。由于Job1和Job2的关联性,二者不能并发执行,只能顺序执行,二者之间用一个互斥锁来控制。如果Thread1先获得互斥锁就先执行,Thread2必须等待,直到Thread1执行完Job1后释放了该互斥锁,Thread2获得该互斥锁后才能执行Job2。显然,这种任务的调度方式应对这种不能完全并行的任务是有缺陷的。实际上Thread2可以去执行其他任务,比如Job5。Job1和Job2既然是顺序的,就都可以交给Thread1执行。表2-1 ThreadPool的处理模型示列
因此,引入了Sharded ThreadPool进行管理。ShardedThreadPool对上述的任务调度方式做了改进,其在线程的执行函数里,添加了表示线程的thread_index:void shardedthreadpool_worker(uint32_t thread_index);具体如何实现Shard方式,还需要使用者自己去实现。其基本的思想就是:每个线程对应一个任务队列,所有需要顺序执行的任务都放在同一个线程的任务队列里,全部由该线程执行。
相关资源:敏捷开发V1.0.pptx