一、生产者-消费者问题(非涉及同步),主要介绍线程函数的作用;
全局变量定义
int nitems; /* read-only by producer and consumer */ struct { pthread_mutex_t mutex; int buff[MAXNITEMS]; int nput; int nval; }shared = {PTHREAD_MUTEX_INITIALIZER}; void *produce(void *); void *consume(void *);main函数如下
int main(int argc, char **argv){ /* 最多可容纳MAXNTHREADS个线程(实际没用到100个),count[i]数组用来存放第i个线程所执行的次数 */ int i, nthreads, count[MAXNTHREADS]; /* tid_produce[i]用来存放第i个线程的的标识 */ pthread_t tid_produce[MAXNTHREADS], tid_consume; /* items表示生产的总条目 threads表示将有多少个线程来生产 */ if (argc != 3) err_quit("usage: prodcons2 <#items> <#threads>"); nitems = min(atoi(argv[1]), MAXNITEMS);/* 命令行参数1(生产的总条目)与10000作比较,取最小值 */ nthreads = min(atoi(argv[2]), MAXNTHREADS);/* 命令行参数2(线程数)与100作比较,取最小值 */ /* Linux标准调用pthread_setconcurrency()函数 */ Set_concurrency(nthreads); /* ------------------------------------------------开始生产------------------------------------------------------------------ */ /* 创建生产线程,&tid_produce[i]为线程标识地址,tid_produce[i]为线程标识的值,&count[i]为传入地址,count[i] 统计第i个线程所执行的次数(值) */ for (i = 0; i < nthreads; i++) { count[i] = 0; Pthread_create(&tid_produce[i], NULL, produce, &count[i]); } /*pthread_join()作用是等tid_produce[i]的线程执行完,即阻塞运行,如果线程不执行完,我就不继续往下执行*/ for (i = 0; i < nthreads; i++) { Pthread_join(tid_produce[i], NULL); printf("count[%d] = %d\n", i, count[i]); } /* ------------------------------------------------生产完成-------------------------------------------------------------------- */ /* 创建一个消费线程*/ Pthread_create(&tid_consume, NULL, consume, NULL); Pthread_join(tid_consume, NULL); exit(0); }其中Set_concurrency()的函数,调用的函数原型是pthread_setconcurrency(),其作用是让系统知道并发运行的线程数量,这样每个生产的线程都会执行到;资料介绍如果在某些系统(Slaris)下省略该调用,会导致只有一个生产线程 执行;
生产者函数如下
static int num = 1; /*只打印一次,查看shared.nput与shared.nval的初值,通过程序执行为0 void *produce(void *arg){ for ( ; ; ) { /* 加锁 */ Pthread_mutex_lock(&shared.mutex); /* nitems 为要生产的总条目,为命令行参数1,演示输入的1000000*/ if (shared.nput >= nitems) { /* 解锁 */ Pthread_mutex_unlock(&shared.mutex); return(NULL); } if(num > 0){ printf("shared.nput: %d--,shared.nval: %d--\n",shared.nput,shared.nval); num--; } shared.buff[shared.nput] = shared.nval;//对应buff[i]存放的值为i shared.nput++; shared.nval++; /* 解锁 */ Pthread_mutex_unlock(&shared.mutex); *((int *) arg) += 1; 传入参数count[i]的值累加,i表示第i个线程,分配的空间在main函数中 } }消费者函数如下:
void *consume(void *arg){ int i; /* 对生产线程所生产的 nitems 条目进行检查*/ for (i = 0; i < nitems; i++) { if (shared.buff[i] != i) printf("buff[%d] = %d\n", i, shared.buff[i]); } return(NULL); }程序演示结果
以上为先用多个线程生产(多个线程访问共享数据时加锁),后用一个线程进行消费,没有涉及到同步问题,后续的同步问题都时在该程序基础上进行,所以有必要了解基本的源代码;
二、与一相比,变化的有两处,main函数和消费者函数
源码分析,与上面的程序相比,函数pthread_setconcurrency()调用的参数增加了1个,即并行执行的线程为生产和消费线程总数,消费线程执行的顺序改变了,基本上与生产线程一并启动;
int main(int argc, char **argv){ int i, nthreads, count[MAXNTHREADS]; pthread_t tid_produce[MAXNTHREADS], tid_consume; if (argc != 3) err_quit("usage: prodcons3 <#items> <#threads>"); nitems = min(atoi(argv[1]), MAXNITEMS); nthreads = min(atoi(argv[2]), MAXNTHREADS); Set_concurrency(nthreads + 1); for (i = 0; i < nthreads; i++) { count[i] = 0; Pthread_create(&tid_produce[i], NULL, produce, &count[i]); } /* 与上面的相比,消费线程启动提前(基本与生产线程一起执行)*/ Pthread_create(&tid_consume, NULL, consume, NULL); for (i = 0; i < nthreads; i++) { Pthread_join(tid_produce[i], NULL); printf("count[%d] = %d\n", i, count[i]); } Pthread_join(tid_consume, NULL); exit(0); }生产者的线程执行函数保持不变,改变的是消费者线程;
消费者线程执行如下
void consume_wait(int i){ /*等待第i个条目是否生产完成,如果生产完成了就return;如果生产没完成,就while(1)执行[加锁、解锁]动作 (等待生产完成),该方法俗称轮转(spinning)或者轮询(polling)*/ for ( ; ; ) { Pthread_mutex_lock(&shared.mutex); if (i < shared.nput) { Pthread_mutex_unlock(&shared.mutex); return; } Pthread_mutex_unlock(&shared.mutex); } } void * consume(void *arg){ int i; for (i = 0; i < nitems; i++) { consume_wait(i);/*等待第i个条目是否生产完成*/ if (shared.buff[i] != i) printf("buff[%d] = %d\n", i, shared.buff[i]); } return(NULL); }资料介绍这种轮询消费的方式对cpu是一种浪费;
三、与二相比,变化的有三处,第一是全局变量的定义;第二是生产者的函数;第三是消费者函数;另外main函数与二完全相同,相同的地方就不介绍了;
全局变量的定义
int nitems; int buff[MAXNITEMS]; struct { pthread_mutex_t mutex; int nput; /* next index to store */ int nval; /* next value to store */ } put = { PTHREAD_MUTEX_INITIALIZER };/* 同一、二一样 */ struct { pthread_mutex_t mutex; pthread_cond_t cond; int nready; /* number ready for consumer */ } nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }; /* 初始化锁mutex为PTHREAD_MUTEX_INITIALIZER,条件变量cond为PTHREAD_COND_INITIALIZER */生产者函数
void* produce(void *arg) { for ( ; ; ) { Pthread_mutex_lock(&put.mutex); if (put.nput >= nitems) { Pthread_mutex_unlock(&put.mutex); return(NULL); /* array is full, we're done */ } buff[put.nput] = put.nval; put.nput++; put.nval++; Pthread_mutex_unlock(&put.mutex); Pthread_mutex_lock(&nready.mutex); if (nready.nready == 0) Pthread_cond_signal(&nready.cond); nready.nready++; Pthread_mutex_unlock(&nready.mutex); *((int *) arg) += 1; } }生产者多了个pthread_cond_signal()函数,作用是唤醒等待正在等待nready.nready 其值变为非零的线程(消费者),需要与关联的互斥锁(nready.mutex)与条件变量(nready.cond)相互协作;
消费者函数
void * consume(void *arg) { int i; for (i = 0; i < nitems; i++) { Pthread_mutex_lock(&nready.mutex); while (nready.nready == 0){ Pthread_cond_wait(&nready.cond, &nready.mutex); } nready.nready--; Pthread_mutex_unlock(&nready.mutex); if (buff[i] != i) printf("buff[%d] = %d\n", i, buff[i]); } return(NULL); }消费者多了个pthread_cond_wait()函数,当nready.nready 为零,资料介绍其作用①给nready.mutex解锁;②把消费者消线程投入睡眠,直到另外线程就条件变量nready.cond调用的pthread_cond_signal()函数来唤醒;
以下为演示结果,为了更加直观,我在程序添加了打印信息:
static int num = 50;生产者线程增加三处打印
在消费者增加四处打印信息
测试结果如下
前面的图是5个线程生产100个条目,后面图是是5个线程生产100w条目(注意关闭消费线程的第一条打印),发现根本没有执行thread_cond_wait()函数;如何让thread_cond_wait()执行呢?
更改主函数中线程启动的顺序,如下
如果只更改顺序还不行(尝试过),最好来点睡眠,调用thread_cond_wait()先让消费者线程睡眠。
测试结果如下:
打印-----4--时,消费者线程已经进入睡眠,可以看到两次结果还不一样,我们只要看前面一张,后面一张不用管(可能跟操作系统执行的算法有关。那么执行顺序到底是怎样呢?
重新修改下打印信息的顺序
void produce(void *arg) { for ( ; ; ) { Pthread_mutex_lock(&put.mutex); if (put.nput >= nitems) { Pthread_mutex_unlock(&put.mutex); return(NULL); /* array is full, we're done */ } buff[put.nput] = put.nval; put.nput++; put.nval++; Pthread_mutex_unlock(&put.mutex); Pthread_mutex_lock(&nready.mutex); if (nready.nready == 0){ if(num > 0){ printf("-----1--\n"); num--; } Pthread_cond_signal(&nready.cond); if(num > 0){ printf("-----2--\n"); num--; } } nready.nready++; if(num > 0){ printf("-----3--\n"); num--; } Pthread_mutex_unlock(&nready.mutex); if(num > 0){ printf("-----4--\n"); num--; } *((int *) arg) += 1; } } void *consume(void *arg) { int i; for (i = 0; i < nitems; i++) { //printf("buff[%d] = %d\n", i, buff[i]); Pthread_mutex_lock(&nready.mutex); while (nready.nready == 0){ if(num > 0){ printf("-----5--\n"); num--; } Pthread_cond_wait(&nready.cond, &nready.mutex); if(num > 0){ printf("-----6--\n"); num--; } } nready.nready--; if(num > 0){ printf("-----7--\n"); num--; } Pthread_mutex_unlock(&nready.mutex); if(num > 0){ printf("-----8--\n"); num--; } if (buff[i] != i) printf("buff[%d] = %d\n", i, buff[i]); } return(NULL); }运行结果如下:
可以看到thread_cond_wait()先释放消费者线程的锁并让该线程睡眠,在thread_cond_wait()返回前,重新上锁,此时while(nready.nready == 0)为假,终止循环;
