Linux之生产者与消费者模型

    xiaoxiao2022-07-04  125

    今天讲下Linux里一个很重要的概念 “生产者与消费者模型” :

    1.生产者与消费者模型的概念

    生产者于消费者模型其实很常用,因为在实际开发中经常会遇到这种情况就是,一个模块负责生产 数据,而另一个模块负责处理数据,在这里就可以认为这个生产数据的模块就充当了生产者的角色 而这个处理数据的模块就充当了消费者的角色,而生产者生产的数据需要一个地方存储,就可以 是一个仓库,这个仓库是练习生产者与消费者的中介。

    关系图如下: 过程就是生产者将生产的数据放到仓库中,消费者要的时候就从里面取,仓库就可以先理解是一个缓冲区,不是真正意义上的仓库!

    2.了解生产者与消费者模型一定要记住下面三点

    1)一个场所

    一个场所指的就是这个仓库,仓库可以用来存数据也可以取数据

    2)两个角色

    两个角色就指的是生产者与消费者 生产者生产数据放到仓库中 消费者从仓库中取数据

    3)三种关系

    生产者与生产者的互斥关系 解释:两个生产者不能同时生产一个数据 消费者与消费者的互斥关系 解释:两个消费者不能同时消费一个数据 生产者与消费者的同步与互斥关系 解释:生产者生产的时候消费者不能消费(互斥) 生产者生产数据后消费者消费(同步)

    同步互斥概念请看我的另一篇博文: https://blog.csdn.net/Python_programer/article/details/90248090

    3.生产者与消费者模型的功能

    1)解耦和

    如果没有生产者与消费者模型的话,生产者就只能靠调用某个函数和消费者练习,那么生产者 与消费者之间就会产生依赖(耦合),但是如果消费者的代码改变后,那么就会影响到生产者 的耦合性(依赖),所以使用生产者与消费者模型后,二者之间就没有那种直接的依赖关系, 而是靠一个缓冲区(仓库)联系起来,相对变得独立(解耦)。

    2)支持忙闲不均

    如果是生产者调用消费者的某个方法,那么很可能出现方法还没调用完毕,生产者就只能等待 这样效率大大降低,但是使用生产者与消费者模型后就不会出现这种情况,消费者并不关心消 费者的情况,而是关注缓冲区是否有数据,生产者只需要生产数据放到缓冲区内,不需要关系消费者什么时候取走。

    3)支持并发

    当生产者生产数据快的话,他不需要担心消费者什么时候取走,只需要放到缓冲区内,只关注缓冲区数据满了没有,而消费者如果生产的慢的话,不要担心,消费者还可以从缓冲区内取已经生产好的数据,多个生产者,消费者可以同时进行操作。

    队列实现生产者与消费者模型:

    实现代码

    1 #include<iostream> 2 #include<queue> 3 #include<pthread.h> 4 5 //手撕生产者与消费者模型 6 //一个场景,三种关系,两个角色 7 //场景:线程安全队列 8 //生产者向队列里放数据,消费者从里面取数据 9 10 class BlockQueue 11 { 12 public: 13 BlockQueue(int cap = 10):capacity(cap){ 14 pthread_cond_init(&productor,NULL); 15 pthread_cond_init(&customor,NULL); 16 pthread_mutex_init(&mutex,NULL); 17 } 18 ~BlockQueue(){ 19 pthread_cond_destroy(&productor); 20 pthread_cond_destroy(&customor); 21 pthread_mutex_destroy(&mutex); 22 } 23 bool QueuePush(int data){ //生产者向队列里放数据,设计临界资源访问首先得加锁 24 QueueLock(); 25 while(QueueIsFull()){ //如果满了的话,生产者等待,这里必须用while,如果用if的话,他会等待,但是等待后依旧会往下走所以是不正确的 26 ProductorWait(); 27 } 28 queue.push(data); //否则放入数据 29 CustomorWakeUp(); //唤醒消费者 30 QueueUnLock(); //临界资源访问完后解锁 31 return true; 32 } 33 bool QueuePop(int* data){ //消费者从队列里取数据 34 QueueLock(); 35 while(QueueIsEmpty()){ 36 CustomorWait(); 37 } 38 *data = queue.front(); 39 queue.pop(); 40 ProductorWakeUp(); 41 QueueUnLock(); 42 return true; 43 } 44 private: 45 void QueueLock(){ 46 pthread_mutex_lock(&mutex); 47 } 48 void QueueUnLock(){ 49 pthread_mutex_unlock(&mutex); 50 } 51 void CustomorWait(){ 52 pthread_cond_wait(&customor,&mutex); 53 } 54 void CustomorWakeUp(){ 55 pthread_cond_signal(&customor); 56 } 57 void ProductorWait(){ 58 pthread_cond_wait(&productor,&mutex); 59 } 60 void ProductorWakeUp(){ 61 pthread_cond_signal(&productor); 62 } 63 bool QueueIsFull(){ 64 return (queue.size()==capacity); 65 } 66 bool QueueIsEmpty(){ 67 return queue.empty(); 68 } 69 private: 70 std::queue<int> queue; 71 int capacity; 72 pthread_mutex_t mutex; 73 pthread_cond_t productor; 74 pthread_cond_t customor; 75 }; 76 77 void* pth_customor(void* arg) //消费者线程创建入口函数 78 { 79 BlockQueue *q = (BlockQueue*) arg; 80 while(1) 81 { 82 int data; 83 q->QueuePop(&data); 84 std::cout<<"customor get data"<<data<<std::endl; 85 } 86 return NULL; 87 } 88 89 void* pth_productor(void* arg) //生产者线程创建入口函数 90 { 91 BlockQueue *q = (BlockQueue*) arg; 92 int i=0; 93 while(1) 94 { 95 std::cout<<"productor put data"<<i<<std::endl; 96 q->QueuePush(i++); 97 } 98 return NULL; 99 } 100 101 int main() 102 { 103 pthread_t ctid[3],ptid[3]; 104 BlockQueue q; 105 int ret; 106 for(int i = 0;i<3;i++) 107 { 108 ret = pthread_create(&ctid[i],NULL,pth_customor,(void*)&q); 109 if(ret!=0){ 110 std::cout<< "customor not exist!\n" <<std::endl; 111 } 112 } 113 for(int i = 0;i<3;i++) 114 { 115 ret = pthread_create(&ptid[i],NULL,pth_productor,(void*)&q); 116 if(ret!=0){ 117 std::cout<< "productor not exist!\n" <<std::endl; 118 } 119 } 120 for(int i = 0;i<3;i++){ //线程等待 121 pthread_join(ctid[i],NULL); 122 } 123 for(int i = 0;i<3;i++){ 124 pthread_join(ptid[i],NULL); 125 } 126 return 0; 127 }
    最新回复(0)